diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 17552ea2a5..e810f28f09 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -34,6 +34,8 @@ After being deprecated since 2.5.0, the following have been removed in Akka 2.6. - Use `AbstractPersistentActor` instead. * `UntypedPersistentActorWithAtLeastOnceDelivery` - Use @apidoc[AbstractPersistentActorWithAtLeastOnceDelivery] instead. +* `akka.stream.actor.ActorSubscriber` and `akka.stream.actor.ActorPublisher` + - Use `GraphStage` instead. After being deprecated since 2.2, the following have been removed in Akka 2.6. diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 03c83e2b0d..aedf1374f8 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -59,7 +59,7 @@ public class IntegrationDocTest extends AbstractJavaTest { + "} \n" + "akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox\n"); - system = ActorSystem.create("ActorPublisherDocTest", config); + system = ActorSystem.create("IntegrationDocTest", config); ref = system.actorOf(Props.create(Translator.class)); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala index a0b92f448e..961e03d825 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeSpec.scala @@ -6,8 +6,8 @@ package akka.stream.scaladsl import java.util.concurrent.ThreadLocalRandom.{ current => random } -import akka.stream.actor.ActorSubscriberMessage.OnComplete -import akka.stream.actor.ActorSubscriberMessage.OnNext +import akka.stream.impl.ActorSubscriberMessage.OnComplete +import akka.stream.impl.ActorSubscriberMessage.OnNext import akka.stream.impl.RequestMore import akka.stream.testkit._ diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index 488b668b7c..44b064a142 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -236,3 +236,7 @@ ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.javadsl.Flow.j ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.javadsl.Flow.joinMat") ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow.join") ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow.joinMat") + +# #26187 Remove ActorPublisher, ActorSubscriber +ProblemFilters.exclude[Problem]("akka.stream.actor.*") + diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala deleted file mode 100644 index e4a7aadae7..0000000000 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ /dev/null @@ -1,461 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.stream.actor - -import java.util.concurrent.ConcurrentHashMap - -import akka.actor._ -import akka.stream.impl.{ ReactiveStreamsCompliance, StreamSubscriptionTimeoutSupport } -import org.reactivestreams.{ Publisher, Subscriber, Subscription } - -import concurrent.duration.Duration -import concurrent.duration.FiniteDuration -import akka.stream.impl.CancelledSubscription -import akka.stream.impl.ReactiveStreamsCompliance._ -import com.github.ghik.silencer.silent - -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -object ActorPublisher { - - /** - * Create a [[org.reactivestreams.Publisher]] backed by a [[ActorPublisher]] actor. It can be - * attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a - * [[akka.stream.scaladsl.Flow]]. - */ - def apply[T](ref: ActorRef): Publisher[T] = ActorPublisherImpl(ref) - - /** - * INTERNAL API - */ - private[akka] object Internal { - final case class Subscribe(subscriber: Subscriber[Any]) - extends DeadLetterSuppression - with NoSerializationVerificationNeeded - - sealed trait LifecycleState - case object PreSubscriber extends LifecycleState - case object Active extends LifecycleState - case object Canceled extends LifecycleState - case object Completed extends LifecycleState - case object CompleteThenStop extends LifecycleState - final case class ErrorEmitted(cause: Throwable, stop: Boolean) extends LifecycleState - } -} - -sealed abstract class ActorPublisherMessage extends DeadLetterSuppression - -object ActorPublisherMessage { - - /** - * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber requests - * more elements. - * @param n number of requested elements - */ - final case class Request(n: Long) extends ActorPublisherMessage with NoSerializationVerificationNeeded { - private var processed = false - - /** - * INTERNAL API: needed for stash support - */ - private[akka] def markProcessed(): Unit = processed = true - - /** - * INTERNAL API: needed for stash support - */ - private[akka] def isProcessed(): Boolean = processed - } - - /** - * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the - * subscription. - */ - final case object Cancel extends Cancel with NoSerializationVerificationNeeded - sealed abstract class Cancel extends ActorPublisherMessage - - /** - * Java API: get the singleton instance of the `Cancel` message - */ - def cancelInstance = Cancel - - /** - * This message is delivered to the [[ActorPublisher]] actor in order to signal the exceeding of an subscription timeout. - * Once the actor receives this message, this publisher will already be in canceled state, thus the actor should clean-up and stop itself. - */ - final case object SubscriptionTimeoutExceeded - extends SubscriptionTimeoutExceeded - with NoSerializationVerificationNeeded - sealed abstract class SubscriptionTimeoutExceeded extends ActorPublisherMessage - - /** - * Java API: get the singleton instance of the `SubscriptionTimeoutExceeded` message - */ - def subscriptionTimeoutExceededInstance = SubscriptionTimeoutExceeded -} - -/** - * Extend/mixin this trait in your [[akka.actor.Actor]] to make it a - * stream publisher that keeps track of the subscription life cycle and - * requested elements. - * - * Create a [[org.reactivestreams.Publisher]] backed by this actor with Scala API [[ActorPublisher#apply]], - * or Java API [[UntypedActorPublisher#create]] or Java API compatible with lambda expressions - * [[AbstractActorPublisher#create]]. - * - * It can be attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a - * [[akka.stream.scaladsl.Flow]]. You can only attach one subscriber to this publisher. - * - * The life cycle state of the subscription is tracked with the following boolean members: - * [[#isActive]], [[#isCompleted]], [[#isErrorEmitted]], and [[#isCanceled]]. - * - * You send elements to the stream by calling [[#onNext]]. You are allowed to send as many - * elements as have been requested by the stream subscriber. This amount can be inquired with - * [[#totalDemand]]. It is only allowed to use `onNext` when `isActive` and `totalDemand > 0`, - * otherwise `onNext` will throw `IllegalStateException`. - * - * When the stream subscriber requests more elements the [[ActorPublisher#Request]] message - * is delivered to this actor, and you can act on that event. The [[#totalDemand]] - * is updated automatically. - * - * When the stream subscriber cancels the subscription the [[ActorPublisher#Cancel]] message - * is delivered to this actor. After that subsequent calls to `onNext` will be ignored. - * - * You can complete the stream by calling [[#onComplete]]. After that you are not allowed to - * call [[#onNext]], [[#onError]] and [[#onComplete]]. - * - * You can terminate the stream with failure by calling [[#onError]]. After that you are not allowed to - * call [[#onNext]], [[#onError]] and [[#onComplete]]. - * - * If you suspect that this [[ActorPublisher]] may never get subscribed to, you can override the [[#subscriptionTimeout]] - * method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when - * the timeout triggers via an [[akka.stream.actor.ActorPublisherMessage.SubscriptionTimeoutExceeded]] message and MUST then perform cleanup and stop itself. - * - * If the actor is stopped the stream will be completed, unless it was not already terminated with - * failure, completed or canceled. - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -trait ActorPublisher[T] extends Actor { - import ActorPublisher.Internal._ - import ActorPublisherMessage._ - import ReactiveStreamsCompliance._ - private val state = ActorPublisherState(context.system) - private var subscriber: Subscriber[Any] = _ - private var demand = 0L - private var lifecycleState: LifecycleState = PreSubscriber - private var scheduledSubscriptionTimeout: Cancellable = StreamSubscriptionTimeoutSupport.NoopSubscriptionTimeout - - /** - * Subscription timeout after which this actor will become Canceled and reject any incoming "late" subscriber. - * - * The actor will receive an [[SubscriptionTimeoutExceeded]] message upon which it - * MUST react by performing all necessary cleanup and stopping itself. - * - * Use this feature in order to avoid leaking actors when you suspect that this Publisher may never get subscribed to by some Subscriber. - */ - def subscriptionTimeout: Duration = Duration.Inf - - /** - * The state when the publisher is active, i.e. before the subscriber is attached - * and when an subscriber is attached. It is allowed to - * call [[#onComplete]] and [[#onError]] in this state. It is - * allowed to call [[#onNext]] in this state when [[#totalDemand]] - * is greater than zero. - */ - final def isActive: Boolean = lifecycleState == Active || lifecycleState == PreSubscriber - - /** - * Total number of requested elements from the stream subscriber. - * This actor automatically keeps tracks of this amount based on - * incoming request messages and outgoing `onNext`. - */ - final def totalDemand: Long = demand - - /** - * The terminal state after calling [[#onComplete]]. It is not allowed to - * call [[#onNext]], [[#onError]], and [[#onComplete]] in this state. - */ - final def isCompleted: Boolean = lifecycleState == Completed - - /** - * The terminal state after calling [[#onError]]. It is not allowed to - * call [[#onNext]], [[#onError]], and [[#onComplete]] in this state. - */ - final def isErrorEmitted: Boolean = lifecycleState.isInstanceOf[ErrorEmitted] - - /** - * The state after the stream subscriber has canceled the subscription. - * It is allowed to call [[#onNext]], [[#onError]], and [[#onComplete]] in - * this state, but the calls will not perform anything. - */ - final def isCanceled: Boolean = lifecycleState == Canceled - - /** - * Send an element to the stream subscriber. You are allowed to send as many elements - * as have been requested by the stream subscriber. This amount can be inquired with - * [[#totalDemand]]. It is only allowed to use `onNext` when `isActive` and `totalDemand > 0`, - * otherwise `onNext` will throw `IllegalStateException`. - */ - def onNext(element: T): Unit = lifecycleState match { - case Active | PreSubscriber => - if (demand > 0) { - demand -= 1 - tryOnNext(subscriber, element) - } else - throw new IllegalStateException( - "onNext is not allowed when the stream has not requested elements, totalDemand was 0") - case _: ErrorEmitted => - throw new IllegalStateException("onNext must not be called after onError") - case Completed | CompleteThenStop => - throw new IllegalStateException("onNext must not be called after onComplete") - case Canceled => // drop - } - - /** - * Complete the stream. After that you are not allowed to - * call [[#onNext]], [[#onError]] and [[#onComplete]]. - */ - def onComplete(): Unit = lifecycleState match { - case Active | PreSubscriber => - lifecycleState = Completed - if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives - try tryOnComplete(subscriber) - finally subscriber = null - case Completed | CompleteThenStop => - throw new IllegalStateException("onComplete must only be called once") - case _: ErrorEmitted => - throw new IllegalStateException("onComplete must not be called after onError") - case Canceled => // drop - } - - /** - * Complete the stream. After that you are not allowed to - * call [[#onNext]], [[#onError]] and [[#onComplete]]. - * - * After signaling completion the Actor will then stop itself as it has completed the protocol. - * When [[#onComplete]] is called before any [[Subscriber]] has had the chance to subscribe - * to this [[ActorPublisher]] the completion signal (and therefore stopping of the Actor as well) - * will be delayed until such [[Subscriber]] arrives. - */ - def onCompleteThenStop(): Unit = lifecycleState match { - case Active | PreSubscriber => - lifecycleState = CompleteThenStop - if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives - try tryOnComplete(subscriber) - finally context.stop(self) - case _ => onComplete() - } - - /** - * Terminate the stream with failure. After that you are not allowed to - * call [[#onNext]], [[#onError]] and [[#onComplete]]. - */ - def onError(cause: Throwable): Unit = lifecycleState match { - case Active | PreSubscriber => - lifecycleState = ErrorEmitted(cause, stop = false) - if (subscriber ne null) // otherwise onError will be called when the subscription arrives - try tryOnError(subscriber, cause) - finally subscriber = null - case _: ErrorEmitted => - throw new IllegalStateException("onError must only be called once") - case Completed | CompleteThenStop => - throw new IllegalStateException("onError must not be called after onComplete") - case Canceled => // drop - } - - /** - * Terminate the stream with failure. After that you are not allowed to - * call [[#onNext]], [[#onError]] and [[#onComplete]]. - * - * After signaling the Error the Actor will then stop itself as it has completed the protocol. - * When [[#onError]] is called before any [[Subscriber]] has had the chance to subscribe - * to this [[ActorPublisher]] the error signal (and therefore stopping of the Actor as well) - * will be delayed until such [[Subscriber]] arrives. - */ - def onErrorThenStop(cause: Throwable): Unit = lifecycleState match { - case Active | PreSubscriber => - lifecycleState = ErrorEmitted(cause, stop = true) - if (subscriber ne null) // otherwise onError will be called when the subscription arrives - try tryOnError(subscriber, cause) - finally context.stop(self) - case _ => onError(cause) - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { - case req @ Request(n) => - if (req.isProcessed()) { - // it's an unstashed Request, demand is already handled - super.aroundReceive(receive, req) - } else { - if (n < 1) { - if (lifecycleState == Active) - onError(numberOfElementsInRequestMustBePositiveException) - } else { - demand += n - if (demand < 0) - demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded - req.markProcessed() - super.aroundReceive(receive, req) - } - } - - case Subscribe(sub: Subscriber[_]) => - lifecycleState match { - case PreSubscriber => - scheduledSubscriptionTimeout.cancel() - subscriber = sub - lifecycleState = Active - tryOnSubscribe(sub, new ActorPublisherSubscription(self)) - case ErrorEmitted(cause, stop) => - if (stop) context.stop(self) - tryOnSubscribe(sub, CancelledSubscription) - tryOnError(sub, cause) - case Completed => - tryOnSubscribe(sub, CancelledSubscription) - tryOnComplete(sub) - case CompleteThenStop => - context.stop(self) - tryOnSubscribe(sub, CancelledSubscription) - tryOnComplete(sub) - case Active | Canceled => - if (subscriber eq sub) - rejectDuplicateSubscriber(sub) - else - rejectAdditionalSubscriber(sub, "ActorPublisher") - } - - case Cancel => - if (lifecycleState != Canceled) { - // possible to receive again in case of stash - cancelSelf() - super.aroundReceive(receive, msg) - } - - case SubscriptionTimeoutExceeded => - if (!scheduledSubscriptionTimeout.isCancelled) { - cancelSelf() - super.aroundReceive(receive, msg) - } - - case _ => - super.aroundReceive(receive, msg) - } - - private def cancelSelf(): Unit = { - lifecycleState = Canceled - demand = 0 - subscriber = null - } - - /** - * INTERNAL API - */ - override protected[akka] def aroundPreStart(): Unit = { - super.aroundPreStart() - import context.dispatcher - - subscriptionTimeout match { - case timeout: FiniteDuration => - scheduledSubscriptionTimeout = context.system.scheduler.scheduleOnce(timeout, self, SubscriptionTimeoutExceeded) - case _ => - // ignore... - } - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { - // some state must survive restart - state.set(self, ActorPublisherState.State(Option(subscriber), demand, lifecycleState)) - super.aroundPreRestart(reason, message) - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundPostRestart(reason: Throwable): Unit = { - state.get(self).foreach { s => - // restore previous state - subscriber = s.subscriber.orNull - demand = s.demand - lifecycleState = s.lifecycleState - } - state.remove(self) - super.aroundPostRestart(reason) - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundPostStop(): Unit = { - state.remove(self) - try if (lifecycleState == Active) tryOnComplete(subscriber) - finally super.aroundPostStop() - } - -} - -/** - * INTERNAL API - */ -@silent("deprecated") -private[akka] final case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[T] { - import ActorPublisher.Internal._ - - override def subscribe(sub: Subscriber[_ >: T]): Unit = { - requireNonNullSubscriber(sub) - ref ! Subscribe(sub.asInstanceOf[Subscriber[Any]]) - } -} - -/** - * INTERNAL API - */ -private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscription { - import ActorPublisherMessage._ - - override def request(n: Long): Unit = ref ! Request(n) - override def cancel(): Unit = ref ! Cancel -} - -/** - * INTERNAL API - * Some state must survive restarts. - */ -private[akka] object ActorPublisherState extends ExtensionId[ActorPublisherState] with ExtensionIdProvider { - import ActorPublisher.Internal.LifecycleState - - override def get(system: ActorSystem): ActorPublisherState = super.get(system) - - override def lookup() = ActorPublisherState - - override def createExtension(system: ExtendedActorSystem): ActorPublisherState = - new ActorPublisherState - - final case class State(subscriber: Option[Subscriber[Any]], demand: Long, lifecycleState: LifecycleState) - -} - -/** - * INTERNAL API - */ -private[akka] class ActorPublisherState extends Extension { - import ActorPublisherState.State - private val state = new ConcurrentHashMap[ActorRef, State] - - def get(ref: ActorRef): Option[State] = Option(state.get(ref)) - - def set(ref: ActorRef, s: State): Unit = state.put(ref, s) - - def remove(ref: ActorRef): Unit = state.remove(ref) -} diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index e9f981cb44..4a88015ce2 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -1,343 +1,351 @@ /* - * Copyright (C) 2014-2019 Lightbend Inc. + * Copyright (C) 2019 Lightbend Inc. */ -package akka.stream.actor - -import java.util.concurrent.ConcurrentHashMap -import org.reactivestreams.{ Subscriber, Subscription } -import akka.actor._ -import akka.stream.impl.ReactiveStreamsCompliance - -object ActorSubscriber { - - /** - * Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]] - * to a [[org.reactivestreams.Publisher]] or [[akka.stream.scaladsl.Flow]]. - */ - def apply[T](ref: ActorRef): Subscriber[T] = new ActorSubscriberImpl(ref) - - /** - * INTERNAL API - */ - private[akka] final case class OnSubscribe(subscription: Subscription) - extends DeadLetterSuppression - with NoSerializationVerificationNeeded - -} - -sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression with NoSerializationVerificationNeeded - -object ActorSubscriberMessage { - final case class OnNext(element: Any) extends ActorSubscriberMessage - final case class OnError(cause: Throwable) extends ActorSubscriberMessage - case object OnComplete extends ActorSubscriberMessage - - /** - * Java API: get the singleton instance of the `OnComplete` message - */ - def onCompleteInstance = OnComplete -} - -/** - * An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure. - */ -trait RequestStrategy { - - /** - * Invoked by the [[ActorSubscriber]] after each incoming message to - * determine how many more elements to request from the stream. - * - * @param remainingRequested current remaining number of elements that - * have been requested from upstream but not received yet - * @return demand of more elements from the stream, returning 0 means that no - * more elements will be requested for now - */ - def requestDemand(remainingRequested: Int): Int -} - -/** - * Requests one more element when `remainingRequested` is 0, i.e. - * max one element in flight. - */ -case object OneByOneRequestStrategy extends RequestStrategy { - def requestDemand(remainingRequested: Int): Int = - if (remainingRequested == 0) 1 else 0 - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -/** - * When request is only controlled with manual calls to - * [[ActorSubscriber#request]]. - */ -case object ZeroRequestStrategy extends RequestStrategy { - def requestDemand(remainingRequested: Int): Int = 0 - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -object WatermarkRequestStrategy { - - /** - * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of - * the specified `highWatermark`. - */ - def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark) -} - -/** - * Requests up to the `highWatermark` when the `remainingRequested` is - * below the `lowWatermark`. This a good strategy when the actor performs work itself. - */ -final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { - require(lowWatermark >= 0, "lowWatermark must be >= 0") - require(highWatermark >= lowWatermark, "highWatermark must be >= lowWatermark") - - /** - * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of - * the specified `highWatermark`. - */ - def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2)) - - def requestDemand(remainingRequested: Int): Int = - if (remainingRequested < lowWatermark) - highWatermark - remainingRequested - else 0 -} - -/** - * Requests up to the `max` and also takes the number of messages - * that have been queued internally or delegated to other actors into account. - * Concrete subclass must implement [[#inFlightInternally]]. - * It will request elements in minimum batches of the defined [[#batchSize]]. - */ -abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy { - - /** - * Concrete subclass must implement this method to define how many - * messages that are currently in progress or queued. - */ - def inFlightInternally: Int - - /** - * Elements will be requested in minimum batches of this size. - * Default is 5. Subclass may override to define the batch size. - */ - def batchSize: Int = 5 - - override def requestDemand(remainingRequested: Int): Int = { - val batch = math.min(batchSize, max) - if ((remainingRequested + inFlightInternally) <= (max - batch)) - math.max(0, max - remainingRequested - inFlightInternally) - else 0 - } -} - -/** - * Extend/mixin this trait in your [[akka.actor.Actor]] to make it a - * stream subscriber with full control of stream back pressure. It will receive - * [[ActorSubscriberMessage.OnNext]], [[ActorSubscriberMessage.OnComplete]] and [[ActorSubscriberMessage.OnError]] - * messages from the stream. It can also receive other, non-stream messages, in - * the same way as any actor. - * - * Attach the actor as a [[org.reactivestreams.Subscriber]] to the stream with - * Scala API [[ActorSubscriber#apply]], or Java API [[ClassicActorSubscriber#create]] or - * Java API compatible with lambda expressions [[ClassicActorSubscriber#create]]. - * - * Subclass must define the [[RequestStrategy]] to control stream back pressure. - * After each incoming message the `ActorSubscriber` will automatically invoke - * the [[RequestStrategy#requestDemand]] and propagate the returned demand to the stream. - * The provided [[WatermarkRequestStrategy]] is a good strategy if the actor - * performs work itself. - * The provided [[MaxInFlightRequestStrategy]] is useful if messages are - * queued internally or delegated to other actors. - * You can also implement a custom [[RequestStrategy]] or call [[#request]] manually - * together with [[ZeroRequestStrategy]] or some other strategy. In that case - * you must also call [[#request]] when the actor is started or when it is ready, otherwise - * it will not receive any elements. - * - * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ -@deprecated( - "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") -trait ActorSubscriber extends Actor { - import ActorSubscriber._ - import ActorSubscriberMessage._ - - private[this] val state = ActorSubscriberState(context.system) - private[this] var subscription: Option[Subscription] = None - private[this] var requested: Long = 0 - private[this] var _canceled = false - - protected def requestStrategy: RequestStrategy - - final def canceled: Boolean = _canceled - - /** - * INTERNAL API - */ - protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { - case _: OnNext => - requested -= 1 - if (!_canceled) { - super.aroundReceive(receive, msg) - request(requestStrategy.requestDemand(remainingRequested)) - } - case OnSubscribe(sub) => - if (subscription.isEmpty) { - subscription = Some(sub) - if (_canceled) { - context.stop(self) - sub.cancel() - } else if (requested != 0) - sub.request(remainingRequested) - } else - sub.cancel() - case OnComplete | OnError(_) => - if (!_canceled) { - _canceled = true - super.aroundReceive(receive, msg) - } - case _ => - super.aroundReceive(receive, msg) - request(requestStrategy.requestDemand(remainingRequested)) - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundPreStart(): Unit = { - super.aroundPreStart() - request(requestStrategy.requestDemand(remainingRequested)) - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundPostRestart(reason: Throwable): Unit = { - state.get(self).foreach { s => - // restore previous state - subscription = s.subscription - requested = s.requested - _canceled = s.canceled - } - state.remove(self) - super.aroundPostRestart(reason) - request(requestStrategy.requestDemand(remainingRequested)) - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { - // some state must survive restart - state.set(self, ActorSubscriberState.State(subscription, requested, _canceled)) - super.aroundPreRestart(reason, message) - } - - /** - * INTERNAL API - */ - protected[akka] override def aroundPostStop(): Unit = { - state.remove(self) - if (!_canceled) subscription.foreach(_.cancel()) - super.aroundPostStop() - } - - /** - * Request a number of elements from upstream. - */ - protected def request(elements: Long): Unit = - if (elements > 0 && !_canceled) { - // if we don't have a subscription yet, it will be requested when it arrives - subscription.foreach(_.request(elements)) - requested += elements - } - - /** - * Cancel upstream subscription. - * No more elements will be delivered after cancel. - * - * The [[ActorSubscriber]] will be stopped immediately after signaling cancellation. - * In case the upstream subscription has not yet arrived the Actor will stay alive - * until a subscription arrives, cancel it and then stop itself. - */ - protected def cancel(): Unit = - if (!_canceled) { - subscription match { - case Some(s) => - context.stop(self) - s.cancel() - case _ => - _canceled = true // cancel will be signaled once a subscription arrives - } - } - - /** - * The number of stream elements that have already been requested from upstream - * but not yet received. - */ - protected def remainingRequested: Int = longToIntMax(requested) - - private def longToIntMax(n: Long): Int = - if (n > Int.MaxValue) Int.MaxValue - else n.toInt -} - -/** - * INTERNAL API - */ -private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] { - import ActorSubscriberMessage._ - override def onError(cause: Throwable): Unit = { - ReactiveStreamsCompliance.requireNonNullException(cause) - impl ! OnError(cause) - } - override def onComplete(): Unit = impl ! OnComplete - override def onNext(element: T): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(element) - impl ! OnNext(element) - } - override def onSubscribe(subscription: Subscription): Unit = { - ReactiveStreamsCompliance.requireNonNullSubscription(subscription) - impl ! ActorSubscriber.OnSubscribe(subscription) - } -} - -/** - * INTERNAL API - * Some state must survive restarts. - */ -private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider { - override def get(system: ActorSystem): ActorSubscriberState = super.get(system) - - override def lookup() = ActorSubscriberState - - override def createExtension(system: ExtendedActorSystem): ActorSubscriberState = - new ActorSubscriberState - - final case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean) - -} - -/** - * INTERNAL API - */ -private[akka] class ActorSubscriberState extends Extension { - import ActorSubscriberState.State - private val state = new ConcurrentHashMap[ActorRef, State] - - def get(ref: ActorRef): Option[State] = Option(state.get(ref)) - - def set(ref: ActorRef, s: State): Unit = state.put(ref, s) - - def remove(ref: ActorRef): Unit = state.remove(ref) -} +///* +// * Copyright (C) 2019 Lightbend Inc. +// */ +// +///* +// * Copyright (C) 2014-2019 Lightbend Inc. +// */ +// +//package akka.stream.actor +// +//import java.util.concurrent.ConcurrentHashMap +//import org.reactivestreams.{ Subscriber, Subscription } +//import akka.actor._ +//import akka.stream.impl.ReactiveStreamsCompliance +// +//object ActorSubscriber { +// +// /** +// * Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]] +// * to a [[org.reactivestreams.Publisher]] or [[akka.stream.scaladsl.Flow]]. +// */ +// def apply[T](ref: ActorRef): Subscriber[T] = new ActorSubscriberImpl(ref) +// +// /** +// * INTERNAL API +// */ +// private[akka] final case class OnSubscribe(subscription: Subscription) +// extends DeadLetterSuppression +// with NoSerializationVerificationNeeded +// +//} +// +//sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression with NoSerializationVerificationNeeded +// +//object ActorSubscriberMessage { +// final case class OnNext(element: Any) extends ActorSubscriberMessage +// final case class OnError(cause: Throwable) extends ActorSubscriberMessage +// case object OnComplete extends ActorSubscriberMessage +// +// /** +// * Java API: get the singleton instance of the `OnComplete` message +// */ +// def onCompleteInstance = OnComplete +//} +// +///** +// * An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure. +// */ +//trait RequestStrategy { +// +// /** +// * Invoked by the [[ActorSubscriber]] after each incoming message to +// * determine how many more elements to request from the stream. +// * +// * @param remainingRequested current remaining number of elements that +// * have been requested from upstream but not received yet +// * @return demand of more elements from the stream, returning 0 means that no +// * more elements will be requested for now +// */ +// def requestDemand(remainingRequested: Int): Int +//} +// +///** +// * Requests one more element when `remainingRequested` is 0, i.e. +// * max one element in flight. +// */ +//case object OneByOneRequestStrategy extends RequestStrategy { +// def requestDemand(remainingRequested: Int): Int = +// if (remainingRequested == 0) 1 else 0 +// +// /** +// * Java API: get the singleton instance +// */ +// def getInstance = this +//} +// +///** +// * When request is only controlled with manual calls to +// * [[ActorSubscriber#request]]. +// */ +//case object ZeroRequestStrategy extends RequestStrategy { +// def requestDemand(remainingRequested: Int): Int = 0 +// +// /** +// * Java API: get the singleton instance +// */ +// def getInstance = this +//} +// +//object WatermarkRequestStrategy { +// +// /** +// * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of +// * the specified `highWatermark`. +// */ +// def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark) +//} +// +///** +// * Requests up to the `highWatermark` when the `remainingRequested` is +// * below the `lowWatermark`. This a good strategy when the actor performs work itself. +// */ +//final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { +// require(lowWatermark >= 0, "lowWatermark must be >= 0") +// require(highWatermark >= lowWatermark, "highWatermark must be >= lowWatermark") +// +// /** +// * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of +// * the specified `highWatermark`. +// */ +// def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2)) +// +// def requestDemand(remainingRequested: Int): Int = +// if (remainingRequested < lowWatermark) +// highWatermark - remainingRequested +// else 0 +//} +// +///** +// * Requests up to the `max` and also takes the number of messages +// * that have been queued internally or delegated to other actors into account. +// * Concrete subclass must implement [[#inFlightInternally]]. +// * It will request elements in minimum batches of the defined [[#batchSize]]. +// */ +//abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy { +// +// /** +// * Concrete subclass must implement this method to define how many +// * messages that are currently in progress or queued. +// */ +// def inFlightInternally: Int +// +// /** +// * Elements will be requested in minimum batches of this size. +// * Default is 5. Subclass may override to define the batch size. +// */ +// def batchSize: Int = 5 +// +// override def requestDemand(remainingRequested: Int): Int = { +// val batch = math.min(batchSize, max) +// if ((remainingRequested + inFlightInternally) <= (max - batch)) +// math.max(0, max - remainingRequested - inFlightInternally) +// else 0 +// } +//} +// +///** +// * Extend/mixin this trait in your [[akka.actor.Actor]] to make it a +// * stream subscriber with full control of stream back pressure. It will receive +// * [[ActorSubscriberMessage.OnNext]], [[ActorSubscriberMessage.OnComplete]] and [[ActorSubscriberMessage.OnError]] +// * messages from the stream. It can also receive other, non-stream messages, in +// * the same way as any actor. +// * +// * Attach the actor as a [[org.reactivestreams.Subscriber]] to the stream with +// * Scala API [[ActorSubscriber#apply]], or Java API [[ClassicActorSubscriber#create]] or +// * Java API compatible with lambda expressions [[ClassicActorSubscriber#create]]. +// * +// * Subclass must define the [[RequestStrategy]] to control stream back pressure. +// * After each incoming message the `ActorSubscriber` will automatically invoke +// * the [[RequestStrategy#requestDemand]] and propagate the returned demand to the stream. +// * The provided [[WatermarkRequestStrategy]] is a good strategy if the actor +// * performs work itself. +// * The provided [[MaxInFlightRequestStrategy]] is useful if messages are +// * queued internally or delegated to other actors. +// * You can also implement a custom [[RequestStrategy]] or call [[#request]] manually +// * together with [[ZeroRequestStrategy]] or some other strategy. In that case +// * you must also call [[#request]] when the actor is started or when it is ready, otherwise +// * it will not receive any elements. +// * +// * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. +// */ +//@deprecated( +// "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", +// since = "2.5.0") +//trait ActorSubscriber extends Actor { +// import ActorSubscriber._ +// import ActorSubscriberMessage._ +// +// private[this] val state = ActorSubscriberState(context.system) +// private[this] var subscription: Option[Subscription] = None +// private[this] var requested: Long = 0 +// private[this] var _canceled = false +// +// protected def requestStrategy: RequestStrategy +// +// final def canceled: Boolean = _canceled +// +// /** +// * INTERNAL API +// */ +// protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { +// case _: OnNext => +// requested -= 1 +// if (!_canceled) { +// super.aroundReceive(receive, msg) +// request(requestStrategy.requestDemand(remainingRequested)) +// } +// case OnSubscribe(sub) => +// if (subscription.isEmpty) { +// subscription = Some(sub) +// if (_canceled) { +// context.stop(self) +// sub.cancel() +// } else if (requested != 0) +// sub.request(remainingRequested) +// } else +// sub.cancel() +// case OnComplete | OnError(_) => +// if (!_canceled) { +// _canceled = true +// super.aroundReceive(receive, msg) +// } +// case _ => +// super.aroundReceive(receive, msg) +// request(requestStrategy.requestDemand(remainingRequested)) +// } +// +// /** +// * INTERNAL API +// */ +// protected[akka] override def aroundPreStart(): Unit = { +// super.aroundPreStart() +// request(requestStrategy.requestDemand(remainingRequested)) +// } +// +// /** +// * INTERNAL API +// */ +// protected[akka] override def aroundPostRestart(reason: Throwable): Unit = { +// state.get(self).foreach { s => +// // restore previous state +// subscription = s.subscription +// requested = s.requested +// _canceled = s.canceled +// } +// state.remove(self) +// super.aroundPostRestart(reason) +// request(requestStrategy.requestDemand(remainingRequested)) +// } +// +// /** +// * INTERNAL API +// */ +// protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { +// // some state must survive restart +// state.set(self, ActorSubscriberState.State(subscription, requested, _canceled)) +// super.aroundPreRestart(reason, message) +// } +// +// /** +// * INTERNAL API +// */ +// protected[akka] override def aroundPostStop(): Unit = { +// state.remove(self) +// if (!_canceled) subscription.foreach(_.cancel()) +// super.aroundPostStop() +// } +// +// /** +// * Request a number of elements from upstream. +// */ +// protected def request(elements: Long): Unit = +// if (elements > 0 && !_canceled) { +// // if we don't have a subscription yet, it will be requested when it arrives +// subscription.foreach(_.request(elements)) +// requested += elements +// } +// +// /** +// * Cancel upstream subscription. +// * No more elements will be delivered after cancel. +// * +// * The [[ActorSubscriber]] will be stopped immediately after signaling cancellation. +// * In case the upstream subscription has not yet arrived the Actor will stay alive +// * until a subscription arrives, cancel it and then stop itself. +// */ +// protected def cancel(): Unit = +// if (!_canceled) { +// subscription match { +// case Some(s) => +// context.stop(self) +// s.cancel() +// case _ => +// _canceled = true // cancel will be signaled once a subscription arrives +// } +// } +// +// /** +// * The number of stream elements that have already been requested from upstream +// * but not yet received. +// */ +// protected def remainingRequested: Int = longToIntMax(requested) +// +// private def longToIntMax(n: Long): Int = +// if (n > Int.MaxValue) Int.MaxValue +// else n.toInt +//} +// +///** +// * INTERNAL API +// */ +//private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] { +// import ActorSubscriberMessage._ +// override def onError(cause: Throwable): Unit = { +// ReactiveStreamsCompliance.requireNonNullException(cause) +// impl ! OnError(cause) +// } +// override def onComplete(): Unit = impl ! OnComplete +// override def onNext(element: T): Unit = { +// ReactiveStreamsCompliance.requireNonNullElement(element) +// impl ! OnNext(element) +// } +// override def onSubscribe(subscription: Subscription): Unit = { +// ReactiveStreamsCompliance.requireNonNullSubscription(subscription) +// impl ! ActorSubscriber.OnSubscribe(subscription) +// } +//} +// +///** +// * INTERNAL API +// * Some state must survive restarts. +// */ +//private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider { +// override def get(system: ActorSystem): ActorSubscriberState = super.get(system) +// +// override def lookup() = ActorSubscriberState +// +// override def createExtension(system: ExtendedActorSystem): ActorSubscriberState = +// new ActorSubscriberState +// +// final case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean) +// +//} +// +///** +// * INTERNAL API +// */ +//private[akka] class ActorSubscriberState extends Extension { +// import ActorSubscriberState.State +// private val state = new ConcurrentHashMap[ActorRef, State] +// +// def get(ref: ActorRef): Option[State] = Option(state.get(ref)) +// +// def set(ref: ActorRef, s: State): Unit = state.put(ref, s) +// +// def remove(ref: ActorRef): Unit = state.remove(ref) +//} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 091fa78f42..cc674428f4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -7,8 +7,7 @@ package akka.stream.impl import akka.actor._ import akka.annotation.InternalApi import akka.stream.{ AbruptTerminationException, Attributes } -import akka.stream.actor.ActorSubscriber.OnSubscribe -import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext } +import akka.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext, OnSubscribe } import org.reactivestreams.{ Processor, Subscriber, Subscription } import akka.event.Logging import akka.stream.ActorAttributes diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 56b21f1b71..57051e41d9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable import scala.util.control.NoStackTrace + import akka.actor.{ Actor, ActorRef, Terminated } import akka.annotation.InternalApi import org.reactivestreams.{ Publisher, Subscriber } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorSubscriberMessage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorSubscriberMessage.scala new file mode 100644 index 0000000000..34bac8bb26 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorSubscriberMessage.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.DeadLetterSuppression +import akka.actor.NoSerializationVerificationNeeded +import akka.annotation.InternalApi +import org.reactivestreams.Subscription + +/** + * INTERNAL API + */ +@InternalApi private[akka] sealed abstract class ActorSubscriberMessage + extends DeadLetterSuppression + with NoSerializationVerificationNeeded + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ActorSubscriberMessage { + final case class OnNext(element: Any) extends ActorSubscriberMessage + final case class OnError(cause: Throwable) extends ActorSubscriberMessage + case object OnComplete extends ActorSubscriberMessage + + // OnSubscribe doesn't extend ActorSubscriberMessage by design, because `OnNext`, `OnError` and `OnComplete` + // are used together, with the same `seal`, but not always `OnSubscribe`. + final case class OnSubscribe(subscription: Subscription) + extends DeadLetterSuppression + with NoSerializationVerificationNeeded + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 037bfb6d31..789b7a6663 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -9,7 +9,6 @@ import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream.ActorAttributes import akka.stream.Attributes import akka.stream.AbruptTerminationException -import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage } import akka.util.unused import org.reactivestreams.{ Subscriber, Subscription } @@ -236,7 +235,7 @@ import org.reactivestreams.{ Subscriber, Subscription } def subreceive: SubReceive = new SubReceive({ case OnSubscribe(id, subscription) => - inputs(id).subreceive(ActorSubscriber.OnSubscribe(subscription)) + inputs(id).subreceive(ActorSubscriberMessage.OnSubscribe(subscription)) case OnNext(id, elem) => if (marked(id) && !pending(id)) markedPending += 1 pending(id, on = true) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index eb361e223e..d4f72e3200 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -5,7 +5,6 @@ package akka.stream.impl import akka.NotUsed -import akka.actor._ import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule @@ -13,7 +12,6 @@ import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import akka.event.Logging -import com.github.ghik.silencer.silent /** * INTERNAL API @@ -87,26 +85,3 @@ import com.github.ghik.silencer.silent override def withAttributes(attr: Attributes): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attr, amendShape(attr)) } - -/** - * INTERNAL API - * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, - * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. - */ -@InternalApi private[akka] final class ActorPublisherSource[Out]( - props: Props, - val attributes: Attributes, - shape: SourceShape[Out]) - extends SourceModule[Out, ActorRef](shape) { - - @silent("deprecated") - override def create(context: MaterializationContext) = { - val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) - (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef) - } - - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = - new ActorPublisherSource[Out](props, attributes, shape) - override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] = - new ActorPublisherSource(props, attr, amendShape(attr)) -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 908acfbd14..56a67a54e6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -7,8 +7,6 @@ package akka.stream.impl import java.util.function.BinaryOperator import akka.NotUsed -import akka.actor.ActorRef -import akka.actor.Props import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts @@ -165,28 +163,6 @@ import scala.util.control.NonFatal override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr)) } -/** - * INTERNAL API - * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, - * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. - */ -@InternalApi private[akka] final class ActorSubscriberSink[In]( - props: Props, - val attributes: Attributes, - shape: SinkShape[In]) - extends SinkModule[In, ActorRef](shape) { - - override def create(context: MaterializationContext) = { - val subscriberRef = context.materializer.actorOf(context, props) - (akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef) - } - - override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = - new ActorSubscriberSink[In](props, attributes, shape) - override def withAttributes(attr: Attributes): SinkModule[In, ActorRef] = - new ActorSubscriberSink[In](props, attr, amendShape(attr)) -} - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 070b9159e3..4348b24841 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -12,7 +12,7 @@ import akka.annotation.InternalApi import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream._ -import akka.stream.actor.ActorSubscriberMessage +import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.TraversalBuilder diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 521d4c4c9f..3738fcd395 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -9,7 +9,6 @@ import akka.actor.{ ActorRef, Terminated } import akka.annotation.InternalApi import akka.event.Logging import akka.stream._ -import akka.stream.actor.{ RequestStrategy, WatermarkRequestStrategy } import akka.stream.impl.FixedSizeBuffer import akka.stream.scaladsl.Source import akka.stream.stage._ @@ -28,6 +27,43 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e */ @InternalApi private[stream] object SourceRefStageImpl { private sealed trait ActorRefStage { def ref: ActorRef } + + object WatermarkRequestStrategy { + + /** + * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of + * the specified `highWatermark`. + */ + def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark) + } + + /** + * Requests up to the `highWatermark` when the `remainingRequested` is + * below the `lowWatermark`. This a good strategy when the actor performs work itself. + */ + final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) { + require(lowWatermark >= 0, "lowWatermark must be >= 0") + require(highWatermark >= lowWatermark, "highWatermark must be >= lowWatermark") + + /** + * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of + * the specified `highWatermark`. + */ + def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2)) + + /** + * Invoked after each incoming message to determine how many more elements to request from the stream. + * + * @param remainingRequested current remaining number of elements that + * have been requested from upstream but not received yet + * @return demand of more elements from the stream, returning 0 means that no + * more elements will be requested for now + */ + def requestDemand(remainingRequested: Int): Int = + if (remainingRequested < lowWatermark) + highWatermark - remainingRequested + else 0 + } } /** @@ -40,6 +76,7 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef]) extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage => import SourceRefStageImpl.ActorRefStage + import SourceRefStageImpl.WatermarkRequestStrategy val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out") override def shape = SourceShape.of(out) @@ -106,7 +143,8 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity) - private val requestStrategy: RequestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity) + private val requestStrategy: WatermarkRequestStrategy = WatermarkRequestStrategy( + highWatermark = receiveBuffer.capacity) // end of demand management --- // initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin") diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 79b76b2818..fe07d47e5f 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -12,7 +12,7 @@ import akka.annotation.InternalApi import akka.japi.function.{ Effect, Procedure } import akka.pattern.ask import akka.stream._ -import akka.stream.actor.ActorSubscriberMessage +import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder } import akka.stream.scaladsl.GenericGraphWithChangedAttributes