=str - renames ReactiveStreamsConstants to ReactiveStreamsCompliance
This commit is contained in:
parent
9ee2a1f882
commit
cb6da7a3d4
11 changed files with 34 additions and 38 deletions
|
|
@ -5,8 +5,7 @@ package akka.stream.actor
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import akka.actor.Cancellable
|
import akka.actor.Cancellable
|
||||||
import akka.stream.ReactiveStreamsConstants
|
import akka.stream.impl.{ ReactiveStreamsCompliance, StreamSubscriptionTimeoutSupport }
|
||||||
import akka.stream.impl.StreamSubscriptionTimeoutSupport
|
|
||||||
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
||||||
import akka.actor.AbstractActor
|
import akka.actor.AbstractActor
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
|
|
@ -234,7 +233,7 @@ trait ActorPublisher[T] extends Actor {
|
||||||
demand += n
|
demand += n
|
||||||
if (demand < 0 && lifecycleState == Active) {
|
if (demand < 0 && lifecycleState == Active) {
|
||||||
// Long has overflown
|
// Long has overflown
|
||||||
val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue)
|
val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue)
|
||||||
onError(demandOverflowException)
|
onError(demandOverflowException)
|
||||||
} else
|
} else
|
||||||
super.aroundReceive(receive, msg)
|
super.aroundReceive(receive, msg)
|
||||||
|
|
@ -250,9 +249,9 @@ trait ActorPublisher[T] extends Actor {
|
||||||
case Completed ⇒ sub.onComplete()
|
case Completed ⇒ sub.onComplete()
|
||||||
case Active | Canceled ⇒
|
case Active | Canceled ⇒
|
||||||
if (subscriber == sub)
|
if (subscriber == sub)
|
||||||
sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
||||||
else
|
else
|
||||||
sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}"))
|
sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
case Cancel ⇒
|
case Cancel ⇒
|
||||||
|
|
@ -343,7 +342,7 @@ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscri
|
||||||
import ActorPublisherMessage._
|
import ActorPublisherMessage._
|
||||||
|
|
||||||
override def request(n: Long): Unit = {
|
override def request(n: Long): Unit = {
|
||||||
ReactiveStreamsConstants.validateRequest(n)
|
ReactiveStreamsCompliance.validateRequest(n)
|
||||||
ref ! Request(n)
|
ref ! Request(n)
|
||||||
}
|
}
|
||||||
override def cancel(): Unit = ref ! Cancel
|
override def cancel(): Unit = ref ! Cancel
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.stream.impl
|
||||||
import java.util.Arrays
|
import java.util.Arrays
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
|
import akka.stream.MaterializerSettings
|
||||||
import akka.stream.actor.ActorSubscriber.OnSubscribe
|
import akka.stream.actor.ActorSubscriber.OnSubscribe
|
||||||
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError }
|
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError }
|
||||||
import org.reactivestreams.{ Subscriber, Subscription, Processor }
|
import org.reactivestreams.{ Subscriber, Subscription, Processor }
|
||||||
|
|
@ -186,7 +186,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
|
||||||
if (subscriber eq null) {
|
if (subscriber eq null) {
|
||||||
subscriber = sub
|
subscriber = sub
|
||||||
subscriber.onSubscribe(createSubscription())
|
subscriber.onSubscribe(createSubscription())
|
||||||
} else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}"))
|
} else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def waitingExposedPublisher: Actor.Receive = {
|
protected def waitingExposedPublisher: Actor.Receive = {
|
||||||
|
|
@ -205,7 +205,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
|
||||||
downstreamDemand += elements
|
downstreamDemand += elements
|
||||||
if (downstreamDemand < 0) {
|
if (downstreamDemand < 0) {
|
||||||
// Long has overflown
|
// Long has overflown
|
||||||
val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue)
|
val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue)
|
||||||
cancel(demandOverflowException)
|
cancel(demandOverflowException)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||||
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
|
import akka.stream.MaterializerSettings
|
||||||
import org.reactivestreams.{ Publisher, Subscriber }
|
import org.reactivestreams.{ Publisher, Subscriber }
|
||||||
import org.reactivestreams.Subscription
|
import org.reactivestreams.Subscription
|
||||||
|
|
||||||
|
|
@ -90,7 +90,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
|
||||||
*/
|
*/
|
||||||
private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription {
|
private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription {
|
||||||
override def request(elements: Long): Unit =
|
override def request(elements: Long): Unit =
|
||||||
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
|
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
|
||||||
else impl ! RequestMore(this, elements)
|
else impl ! RequestMore(this, elements)
|
||||||
override def cancel(): Unit = impl ! Cancel(this)
|
override def cancel(): Unit = impl ! Cancel(this)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import akka.actor.Props
|
||||||
import akka.actor.Status
|
import akka.actor.Status
|
||||||
import akka.actor.SupervisorStrategy
|
import akka.actor.SupervisorStrategy
|
||||||
import akka.stream.MaterializerSettings
|
import akka.stream.MaterializerSettings
|
||||||
import akka.stream.ReactiveStreamsConstants
|
|
||||||
import akka.pattern.pipe
|
import akka.pattern.pipe
|
||||||
import org.reactivestreams.Subscriber
|
import org.reactivestreams.Subscriber
|
||||||
import org.reactivestreams.Subscription
|
import org.reactivestreams.Subscription
|
||||||
|
|
@ -35,7 +35,7 @@ private[akka] object FuturePublisher {
|
||||||
import akka.stream.impl.FuturePublisher.FutureSubscription._
|
import akka.stream.impl.FuturePublisher.FutureSubscription._
|
||||||
def cancel(): Unit = ref ! Cancel(this)
|
def cancel(): Unit = ref ! Cancel(this)
|
||||||
def request(elements: Long): Unit =
|
def request(elements: Long): Unit =
|
||||||
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
|
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
|
||||||
else ref ! RequestMore(this)
|
else ref ! RequestMore(this)
|
||||||
override def toString = "FutureSubscription"
|
override def toString = "FutureSubscription"
|
||||||
}
|
}
|
||||||
|
|
@ -107,7 +107,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS
|
||||||
|
|
||||||
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
|
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
|
||||||
if (subscribers.contains(subscriber))
|
if (subscribers.contains(subscriber))
|
||||||
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
||||||
else {
|
else {
|
||||||
val subscription = new FutureSubscription(self)
|
val subscription = new FutureSubscription(self)
|
||||||
subscribers = subscribers.updated(subscriber, subscription)
|
subscribers = subscribers.updated(subscriber, subscription)
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import akka.actor.Actor
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.stream.MaterializerSettings
|
import akka.stream.MaterializerSettings
|
||||||
import akka.stream.ReactiveStreamsConstants
|
|
||||||
import org.reactivestreams.Subscriber
|
import org.reactivestreams.Subscriber
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -36,7 +36,7 @@ private[akka] object IteratorPublisher {
|
||||||
*/
|
*/
|
||||||
private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: MaterializerSettings) extends Actor {
|
private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: MaterializerSettings) extends Actor {
|
||||||
import IteratorPublisher._
|
import IteratorPublisher._
|
||||||
import ReactiveStreamsConstants._
|
import ReactiveStreamsCompliance._
|
||||||
|
|
||||||
private var exposedPublisher: ActorPublisher[Any] = _
|
private var exposedPublisher: ActorPublisher[Any] = _
|
||||||
private var subscriber: Subscriber[Any] = _
|
private var subscriber: Subscriber[Any] = _
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,12 @@
|
||||||
/**
|
package akka.stream.impl
|
||||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
package akka.stream
|
|
||||||
|
|
||||||
import org.reactivestreams.{ Subscription, Subscriber, Publisher }
|
|
||||||
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
import org.reactivestreams.{ Subscriber, Publisher, Subscription }
|
||||||
|
|
||||||
object ReactiveStreamsConstants {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[stream] object ReactiveStreamsCompliance {
|
||||||
|
|
||||||
final val CanNotSubscribeTheSameSubscriberMultipleTimes =
|
final val CanNotSubscribeTheSameSubscriberMultipleTimes =
|
||||||
"can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)"
|
"can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)"
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.stream.impl
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import akka.actor.ActorLogging
|
import akka.actor.ActorLogging
|
||||||
import akka.actor.Cancellable
|
import akka.actor.Cancellable
|
||||||
import akka.stream.ReactiveStreamsConstants
|
|
||||||
import akka.actor.{ Actor, ActorRef }
|
import akka.actor.{ Actor, ActorRef }
|
||||||
import akka.stream.MaterializerSettings
|
import akka.stream.MaterializerSettings
|
||||||
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
||||||
|
|
@ -25,7 +25,7 @@ private[akka] object MultiStreamOutputProcessor {
|
||||||
|
|
||||||
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
|
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
|
||||||
override def request(elements: Long): Unit =
|
override def request(elements: Long): Unit =
|
||||||
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
|
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
|
||||||
else parent ! SubstreamRequestMore(substreamKey, elements)
|
else parent ! SubstreamRequestMore(substreamKey, elements)
|
||||||
override def cancel(): Unit = parent ! SubstreamCancel(substreamKey)
|
override def cancel(): Unit = parent ! SubstreamCancel(substreamKey)
|
||||||
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
|
override def toString = "SubstreamSubscription" + System.identityHashCode(this)
|
||||||
|
|
@ -91,7 +91,7 @@ private[akka] object MultiStreamOutputProcessor {
|
||||||
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
|
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
|
||||||
else {
|
else {
|
||||||
state.get() match {
|
state.get() match {
|
||||||
case _: Attached ⇒ s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber))
|
case _: Attached ⇒ s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber))
|
||||||
case c: CompletedState ⇒ closeSubscriber(s, c)
|
case c: CompletedState ⇒ closeSubscriber(s, c)
|
||||||
case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before")
|
case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,6 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.impl
|
package akka.stream.impl
|
||||||
|
|
||||||
import akka.stream.ReactiveStreamsConstants
|
|
||||||
|
|
||||||
import scala.annotation.switch
|
import scala.annotation.switch
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import org.reactivestreams.{ Subscriber, Subscription }
|
import org.reactivestreams.{ Subscriber, Subscription }
|
||||||
|
|
@ -49,7 +47,7 @@ private[akka] trait SubscriptionWithCursor[T] extends Subscription with Resizabl
|
||||||
val noOverflow = sum > 0
|
val noOverflow = sum > 0
|
||||||
|
|
||||||
if (noOverflow) totalDemand = sum
|
if (noOverflow) totalDemand = sum
|
||||||
else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue}"))
|
else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue}"))
|
||||||
|
|
||||||
sum
|
sum
|
||||||
}
|
}
|
||||||
|
|
@ -215,7 +213,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
||||||
*/
|
*/
|
||||||
protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match {
|
protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match {
|
||||||
case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒
|
case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒
|
||||||
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
||||||
case NotReached ⇒
|
case NotReached ⇒
|
||||||
val newSubscription = createSubscription(subscriber)
|
val newSubscription = createSubscription(subscriber)
|
||||||
subscriptions ::= newSubscription
|
subscriptions ::= newSubscription
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
package akka.stream.impl
|
package akka.stream.impl
|
||||||
|
|
||||||
import akka.dispatch.ExecutionContexts
|
import akka.dispatch.ExecutionContexts
|
||||||
import akka.stream.ReactiveStreamsConstants
|
|
||||||
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
@ -46,7 +46,7 @@ private[akka] object SynchronousIterablePublisher {
|
||||||
done = true
|
done = true
|
||||||
|
|
||||||
override def request(elements: Long): Unit = {
|
override def request(elements: Long): Unit = {
|
||||||
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
|
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
|
||||||
@tailrec def pushNext(): Unit = {
|
@tailrec def pushNext(): Unit = {
|
||||||
if (!done)
|
if (!done)
|
||||||
if (iterator.isEmpty) {
|
if (iterator.isEmpty) {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
package akka.stream.impl
|
package akka.stream.impl
|
||||||
|
|
||||||
import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy }
|
import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy }
|
||||||
import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants }
|
import akka.stream.MaterializerSettings
|
||||||
import org.reactivestreams.{ Subscriber, Subscription }
|
import org.reactivestreams.{ Subscriber, Subscription }
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
|
@ -27,7 +27,7 @@ private[akka] object TickPublisher {
|
||||||
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
|
import akka.stream.impl.TickPublisher.TickPublisherSubscription._
|
||||||
def cancel(): Unit = ref ! Cancel(subscriber)
|
def cancel(): Unit = ref ! Cancel(subscriber)
|
||||||
def request(elements: Long): Unit =
|
def request(elements: Long): Unit =
|
||||||
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
|
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg)
|
||||||
else ref ! RequestMore(elements, subscriber)
|
else ref ! RequestMore(elements, subscriber)
|
||||||
override def toString = "TickPublisherSubscription"
|
override def toString = "TickPublisherSubscription"
|
||||||
}
|
}
|
||||||
|
|
@ -99,7 +99,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
|
||||||
|
|
||||||
def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = {
|
def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = {
|
||||||
if (demand.contains(subscriber))
|
if (demand.contains(subscriber))
|
||||||
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
|
||||||
else {
|
else {
|
||||||
val subscription = new TickPublisherSubscription(self, subscriber)
|
val subscription = new TickPublisherSubscription(self, subscriber)
|
||||||
demand(subscriber) = 0
|
demand(subscriber) = 0
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import java.util.Arrays
|
||||||
|
|
||||||
import akka.actor.{ Actor, ActorRef }
|
import akka.actor.{ Actor, ActorRef }
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants }
|
import akka.stream.MaterializerSettings
|
||||||
import akka.stream.actor.ActorSubscriber.OnSubscribe
|
import akka.stream.actor.ActorSubscriber.OnSubscribe
|
||||||
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
|
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
|
||||||
import akka.stream.impl._
|
import akka.stream.impl._
|
||||||
|
|
@ -205,7 +205,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp
|
||||||
if (subscriber eq null) {
|
if (subscriber eq null) {
|
||||||
subscriber = sub
|
subscriber = sub
|
||||||
subscriber.onSubscribe(new ActorSubscription(actor, subscriber))
|
subscriber.onSubscribe(new ActorSubscription(actor, subscriber))
|
||||||
} else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}"))
|
} else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def waitingExposedPublisher: Actor.Receive = {
|
protected def waitingExposedPublisher: Actor.Receive = {
|
||||||
|
|
@ -225,7 +225,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp
|
||||||
downstreamDemand += elements
|
downstreamDemand += elements
|
||||||
if (downstreamDemand < 0) {
|
if (downstreamDemand < 0) {
|
||||||
// Long has overflown
|
// Long has overflown
|
||||||
val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue)
|
val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue)
|
||||||
enter().finish()
|
enter().finish()
|
||||||
fail(demandOverflowException)
|
fail(demandOverflowException)
|
||||||
} else if (upstreamWaiting) {
|
} else if (upstreamWaiting) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue