diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index b88c02d857..27822c4c99 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -5,8 +5,7 @@ package akka.stream.actor import java.util.concurrent.ConcurrentHashMap import akka.actor.Cancellable -import akka.stream.ReactiveStreamsConstants -import akka.stream.impl.StreamSubscriptionTimeoutSupport +import akka.stream.impl.{ ReactiveStreamsCompliance, StreamSubscriptionTimeoutSupport } import org.reactivestreams.{ Publisher, Subscriber, Subscription } import akka.actor.AbstractActor import akka.actor.Actor @@ -234,7 +233,7 @@ trait ActorPublisher[T] extends Actor { demand += n if (demand < 0 && lifecycleState == Active) { // Long has overflown - val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue) + val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue) onError(demandOverflowException) } else super.aroundReceive(receive, msg) @@ -250,9 +249,9 @@ trait ActorPublisher[T] extends Actor { case Completed ⇒ sub.onComplete() case Active | Canceled ⇒ if (subscriber == sub) - sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) + sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) else - sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}")) + sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}")) } case Cancel ⇒ @@ -343,7 +342,7 @@ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscri import ActorPublisherMessage._ override def request(n: Long): Unit = { - ReactiveStreamsConstants.validateRequest(n) + ReactiveStreamsCompliance.validateRequest(n) ref ! Request(n) } override def cancel(): Unit = ref ! Cancel diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index bc3a260ec6..8c252d0375 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -6,7 +6,7 @@ package akka.stream.impl import java.util.Arrays import akka.actor._ -import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } +import akka.stream.MaterializerSettings import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError } import org.reactivestreams.{ Subscriber, Subscription, Processor } @@ -186,7 +186,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D if (subscriber eq null) { subscriber = sub subscriber.onSubscribe(createSubscription()) - } else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}")) + } else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}")) } protected def waitingExposedPublisher: Actor.Receive = { @@ -205,7 +205,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D downstreamDemand += elements if (downstreamDemand < 0) { // Long has overflown - val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue) + val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue) cancel(demandOverflowException) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 5e0260c028..2c75716616 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -8,7 +8,7 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.util.control.{ NoStackTrace, NonFatal } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } -import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } +import akka.stream.MaterializerSettings import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.Subscription @@ -90,7 +90,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { */ private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription { override def request(elements: Long): Unit = - if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) + if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) else impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index deef5342e4..dd012d15f5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -14,7 +14,7 @@ import akka.actor.Props import akka.actor.Status import akka.actor.SupervisorStrategy import akka.stream.MaterializerSettings -import akka.stream.ReactiveStreamsConstants + import akka.pattern.pipe import org.reactivestreams.Subscriber import org.reactivestreams.Subscription @@ -35,7 +35,7 @@ private[akka] object FuturePublisher { import akka.stream.impl.FuturePublisher.FutureSubscription._ def cancel(): Unit = ref ! Cancel(this) def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) else ref ! RequestMore(this) override def toString = "FutureSubscription" } @@ -107,7 +107,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS def registerSubscriber(subscriber: Subscriber[Any]): Unit = { if (subscribers.contains(subscriber)) - subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) + subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) else { val subscription = new FutureSubscription(self) subscribers = subscribers.updated(subscriber, subscription) diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala index d7a6ede7e5..2874dfd221 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala @@ -9,7 +9,7 @@ import akka.actor.Actor import akka.actor.Props import akka.event.Logging import akka.stream.MaterializerSettings -import akka.stream.ReactiveStreamsConstants + import org.reactivestreams.Subscriber /** @@ -36,7 +36,7 @@ private[akka] object IteratorPublisher { */ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: MaterializerSettings) extends Actor { import IteratorPublisher._ - import ReactiveStreamsConstants._ + import ReactiveStreamsCompliance._ private var exposedPublisher: ActorPublisher[Any] = _ private var subscriber: Subscriber[Any] = _ diff --git a/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala similarity index 93% rename from akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala rename to akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index f29d2aca0a..63d7f87c05 100644 --- a/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -1,13 +1,12 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import org.reactivestreams.{ Subscription, Subscriber, Publisher } +package akka.stream.impl import scala.util.control.NonFatal +import org.reactivestreams.{ Subscriber, Publisher, Subscription } -object ReactiveStreamsConstants { +/** + * INTERNAL API + */ +private[stream] object ReactiveStreamsCompliance { final val CanNotSubscribeTheSameSubscriberMultipleTimes = "can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)" diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 023743dddf..12d88ae812 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -6,7 +6,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference import akka.actor.ActorLogging import akka.actor.Cancellable -import akka.stream.ReactiveStreamsConstants + import akka.actor.{ Actor, ActorRef } import akka.stream.MaterializerSettings import org.reactivestreams.{ Publisher, Subscriber, Subscription } @@ -25,7 +25,7 @@ private[akka] object MultiStreamOutputProcessor { class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { override def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) else parent ! SubstreamRequestMore(substreamKey, elements) override def cancel(): Unit = parent ! SubstreamCancel(substreamKey) override def toString = "SubstreamSubscription" + System.identityHashCode(this) @@ -91,7 +91,7 @@ private[akka] object MultiStreamOutputProcessor { if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) else { state.get() match { - case _: Attached ⇒ s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber)) + case _: Attached ⇒ s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber)) case c: CompletedState ⇒ closeSubscriber(s, c) case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 215180e1f8..9d630c1272 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -3,8 +3,6 @@ */ package akka.stream.impl -import akka.stream.ReactiveStreamsConstants - import scala.annotation.switch import scala.annotation.tailrec import org.reactivestreams.{ Subscriber, Subscription } @@ -49,7 +47,7 @@ private[akka] trait SubscriptionWithCursor[T] extends Subscription with Resizabl val noOverflow = sum > 0 if (noOverflow) totalDemand = sum - else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue}")) + else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue}")) sum } @@ -215,7 +213,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff */ protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match { case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ - subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) + subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) case NotReached ⇒ val newSubscription = createSubscription(subscriber) subscriptions ::= newSubscription diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala index ccd2089913..a530e6493f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala @@ -4,7 +4,7 @@ package akka.stream.impl import akka.dispatch.ExecutionContexts -import akka.stream.ReactiveStreamsConstants + import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.tailrec @@ -46,7 +46,7 @@ private[akka] object SynchronousIterablePublisher { done = true override def request(elements: Long): Unit = { - if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) + if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) @tailrec def pushNext(): Unit = { if (!done) if (iterator.isEmpty) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 82085f8711..3a40be54fb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -4,7 +4,7 @@ package akka.stream.impl import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } -import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants } +import akka.stream.MaterializerSettings import org.reactivestreams.{ Subscriber, Subscription } import scala.collection.mutable @@ -27,7 +27,7 @@ private[akka] object TickPublisher { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ def cancel(): Unit = ref ! Cancel(subscriber) def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) else ref ! RequestMore(elements, subscriber) override def toString = "TickPublisherSubscription" } @@ -99,7 +99,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = { if (demand.contains(subscriber)) - subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) + subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) else { val subscription = new TickPublisherSubscription(self, subscriber) demand(subscriber) = 0 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index e55eff8757..36c8d25536 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -7,7 +7,7 @@ import java.util.Arrays import akka.actor.{ Actor, ActorRef } import akka.event.Logging -import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants } +import akka.stream.MaterializerSettings import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.impl._ @@ -205,7 +205,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp if (subscriber eq null) { subscriber = sub subscriber.onSubscribe(new ActorSubscription(actor, subscriber)) - } else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}")) + } else sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsCompliance.SupportsOnlyASingleSubscriber}")) } protected def waitingExposedPublisher: Actor.Receive = { @@ -225,7 +225,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp downstreamDemand += elements if (downstreamDemand < 0) { // Long has overflown - val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue) + val demandOverflowException = new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue) enter().finish() fail(demandOverflowException) } else if (upstreamWaiting) {