Merge pull request #27707 from akka/wip-26187-finally-patriknw

Final removal of ActorPublisher and ActorSubscriber, #26187
This commit is contained in:
Patrik Nordwall 2019-09-24 13:31:25 +02:00 committed by GitHub
commit b94f3c3bcc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 435 additions and 861 deletions

View file

@ -34,6 +34,8 @@ After being deprecated since 2.5.0, the following have been removed in Akka 2.6.
- Use `AbstractPersistentActor` instead. - Use `AbstractPersistentActor` instead.
* `UntypedPersistentActorWithAtLeastOnceDelivery` * `UntypedPersistentActorWithAtLeastOnceDelivery`
- Use @apidoc[AbstractPersistentActorWithAtLeastOnceDelivery] instead. - Use @apidoc[AbstractPersistentActorWithAtLeastOnceDelivery] instead.
* `akka.stream.actor.ActorSubscriber` and `akka.stream.actor.ActorPublisher`
- Use `GraphStage` instead.
After being deprecated since 2.2, the following have been removed in Akka 2.6. After being deprecated since 2.2, the following have been removed in Akka 2.6.

View file

@ -59,7 +59,7 @@ public class IntegrationDocTest extends AbstractJavaTest {
+ "} \n" + "} \n"
+ "akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox\n"); + "akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox\n");
system = ActorSystem.create("ActorPublisherDocTest", config); system = ActorSystem.create("IntegrationDocTest", config);
ref = system.actorOf(Props.create(Translator.class)); ref = system.actorOf(Props.create(Translator.class));
} }

View file

@ -6,8 +6,8 @@ package akka.stream.scaladsl
import java.util.concurrent.ThreadLocalRandom.{ current => random } import java.util.concurrent.ThreadLocalRandom.{ current => random }
import akka.stream.actor.ActorSubscriberMessage.OnComplete import akka.stream.impl.ActorSubscriberMessage.OnComplete
import akka.stream.actor.ActorSubscriberMessage.OnNext import akka.stream.impl.ActorSubscriberMessage.OnNext
import akka.stream.impl.RequestMore import akka.stream.impl.RequestMore
import akka.stream.testkit._ import akka.stream.testkit._

View file

@ -236,3 +236,7 @@ ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.javadsl.Flow.j
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.javadsl.Flow.joinMat") ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.javadsl.Flow.joinMat")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow.join") ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow.join")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow.joinMat") ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow.joinMat")
# #26187 Remove ActorPublisher, ActorSubscriber
ProblemFilters.exclude[Problem]("akka.stream.actor.*")

View file

@ -1,461 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.actor
import java.util.concurrent.ConcurrentHashMap
import akka.actor._
import akka.stream.impl.{ ReactiveStreamsCompliance, StreamSubscriptionTimeoutSupport }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import concurrent.duration.Duration
import concurrent.duration.FiniteDuration
import akka.stream.impl.CancelledSubscription
import akka.stream.impl.ReactiveStreamsCompliance._
import com.github.ghik.silencer.silent
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
object ActorPublisher {
/**
* Create a [[org.reactivestreams.Publisher]] backed by a [[ActorPublisher]] actor. It can be
* attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a
* [[akka.stream.scaladsl.Flow]].
*/
def apply[T](ref: ActorRef): Publisher[T] = ActorPublisherImpl(ref)
/**
* INTERNAL API
*/
private[akka] object Internal {
final case class Subscribe(subscriber: Subscriber[Any])
extends DeadLetterSuppression
with NoSerializationVerificationNeeded
sealed trait LifecycleState
case object PreSubscriber extends LifecycleState
case object Active extends LifecycleState
case object Canceled extends LifecycleState
case object Completed extends LifecycleState
case object CompleteThenStop extends LifecycleState
final case class ErrorEmitted(cause: Throwable, stop: Boolean) extends LifecycleState
}
}
sealed abstract class ActorPublisherMessage extends DeadLetterSuppression
object ActorPublisherMessage {
/**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber requests
* more elements.
* @param n number of requested elements
*/
final case class Request(n: Long) extends ActorPublisherMessage with NoSerializationVerificationNeeded {
private var processed = false
/**
* INTERNAL API: needed for stash support
*/
private[akka] def markProcessed(): Unit = processed = true
/**
* INTERNAL API: needed for stash support
*/
private[akka] def isProcessed(): Boolean = processed
}
/**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the
* subscription.
*/
final case object Cancel extends Cancel with NoSerializationVerificationNeeded
sealed abstract class Cancel extends ActorPublisherMessage
/**
* Java API: get the singleton instance of the `Cancel` message
*/
def cancelInstance = Cancel
/**
* This message is delivered to the [[ActorPublisher]] actor in order to signal the exceeding of an subscription timeout.
* Once the actor receives this message, this publisher will already be in canceled state, thus the actor should clean-up and stop itself.
*/
final case object SubscriptionTimeoutExceeded
extends SubscriptionTimeoutExceeded
with NoSerializationVerificationNeeded
sealed abstract class SubscriptionTimeoutExceeded extends ActorPublisherMessage
/**
* Java API: get the singleton instance of the `SubscriptionTimeoutExceeded` message
*/
def subscriptionTimeoutExceededInstance = SubscriptionTimeoutExceeded
}
/**
* Extend/mixin this trait in your [[akka.actor.Actor]] to make it a
* stream publisher that keeps track of the subscription life cycle and
* requested elements.
*
* Create a [[org.reactivestreams.Publisher]] backed by this actor with Scala API [[ActorPublisher#apply]],
* or Java API [[UntypedActorPublisher#create]] or Java API compatible with lambda expressions
* [[AbstractActorPublisher#create]].
*
* It can be attached to a [[org.reactivestreams.Subscriber]] or be used as an input source for a
* [[akka.stream.scaladsl.Flow]]. You can only attach one subscriber to this publisher.
*
* The life cycle state of the subscription is tracked with the following boolean members:
* [[#isActive]], [[#isCompleted]], [[#isErrorEmitted]], and [[#isCanceled]].
*
* You send elements to the stream by calling [[#onNext]]. You are allowed to send as many
* elements as have been requested by the stream subscriber. This amount can be inquired with
* [[#totalDemand]]. It is only allowed to use `onNext` when `isActive` and `totalDemand > 0`,
* otherwise `onNext` will throw `IllegalStateException`.
*
* When the stream subscriber requests more elements the [[ActorPublisher#Request]] message
* is delivered to this actor, and you can act on that event. The [[#totalDemand]]
* is updated automatically.
*
* When the stream subscriber cancels the subscription the [[ActorPublisher#Cancel]] message
* is delivered to this actor. After that subsequent calls to `onNext` will be ignored.
*
* You can complete the stream by calling [[#onComplete]]. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*
* You can terminate the stream with failure by calling [[#onError]]. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*
* If you suspect that this [[ActorPublisher]] may never get subscribed to, you can override the [[#subscriptionTimeout]]
* method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
* the timeout triggers via an [[akka.stream.actor.ActorPublisherMessage.SubscriptionTimeoutExceeded]] message and MUST then perform cleanup and stop itself.
*
* If the actor is stopped the stream will be completed, unless it was not already terminated with
* failure, completed or canceled.
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
trait ActorPublisher[T] extends Actor {
import ActorPublisher.Internal._
import ActorPublisherMessage._
import ReactiveStreamsCompliance._
private val state = ActorPublisherState(context.system)
private var subscriber: Subscriber[Any] = _
private var demand = 0L
private var lifecycleState: LifecycleState = PreSubscriber
private var scheduledSubscriptionTimeout: Cancellable = StreamSubscriptionTimeoutSupport.NoopSubscriptionTimeout
/**
* Subscription timeout after which this actor will become Canceled and reject any incoming "late" subscriber.
*
* The actor will receive an [[SubscriptionTimeoutExceeded]] message upon which it
* MUST react by performing all necessary cleanup and stopping itself.
*
* Use this feature in order to avoid leaking actors when you suspect that this Publisher may never get subscribed to by some Subscriber.
*/
def subscriptionTimeout: Duration = Duration.Inf
/**
* The state when the publisher is active, i.e. before the subscriber is attached
* and when an subscriber is attached. It is allowed to
* call [[#onComplete]] and [[#onError]] in this state. It is
* allowed to call [[#onNext]] in this state when [[#totalDemand]]
* is greater than zero.
*/
final def isActive: Boolean = lifecycleState == Active || lifecycleState == PreSubscriber
/**
* Total number of requested elements from the stream subscriber.
* This actor automatically keeps tracks of this amount based on
* incoming request messages and outgoing `onNext`.
*/
final def totalDemand: Long = demand
/**
* The terminal state after calling [[#onComplete]]. It is not allowed to
* call [[#onNext]], [[#onError]], and [[#onComplete]] in this state.
*/
final def isCompleted: Boolean = lifecycleState == Completed
/**
* The terminal state after calling [[#onError]]. It is not allowed to
* call [[#onNext]], [[#onError]], and [[#onComplete]] in this state.
*/
final def isErrorEmitted: Boolean = lifecycleState.isInstanceOf[ErrorEmitted]
/**
* The state after the stream subscriber has canceled the subscription.
* It is allowed to call [[#onNext]], [[#onError]], and [[#onComplete]] in
* this state, but the calls will not perform anything.
*/
final def isCanceled: Boolean = lifecycleState == Canceled
/**
* Send an element to the stream subscriber. You are allowed to send as many elements
* as have been requested by the stream subscriber. This amount can be inquired with
* [[#totalDemand]]. It is only allowed to use `onNext` when `isActive` and `totalDemand > 0`,
* otherwise `onNext` will throw `IllegalStateException`.
*/
def onNext(element: T): Unit = lifecycleState match {
case Active | PreSubscriber =>
if (demand > 0) {
demand -= 1
tryOnNext(subscriber, element)
} else
throw new IllegalStateException(
"onNext is not allowed when the stream has not requested elements, totalDemand was 0")
case _: ErrorEmitted =>
throw new IllegalStateException("onNext must not be called after onError")
case Completed | CompleteThenStop =>
throw new IllegalStateException("onNext must not be called after onComplete")
case Canceled => // drop
}
/**
* Complete the stream. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*/
def onComplete(): Unit = lifecycleState match {
case Active | PreSubscriber =>
lifecycleState = Completed
if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
try tryOnComplete(subscriber)
finally subscriber = null
case Completed | CompleteThenStop =>
throw new IllegalStateException("onComplete must only be called once")
case _: ErrorEmitted =>
throw new IllegalStateException("onComplete must not be called after onError")
case Canceled => // drop
}
/**
* Complete the stream. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*
* After signaling completion the Actor will then stop itself as it has completed the protocol.
* When [[#onComplete]] is called before any [[Subscriber]] has had the chance to subscribe
* to this [[ActorPublisher]] the completion signal (and therefore stopping of the Actor as well)
* will be delayed until such [[Subscriber]] arrives.
*/
def onCompleteThenStop(): Unit = lifecycleState match {
case Active | PreSubscriber =>
lifecycleState = CompleteThenStop
if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
try tryOnComplete(subscriber)
finally context.stop(self)
case _ => onComplete()
}
/**
* Terminate the stream with failure. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*/
def onError(cause: Throwable): Unit = lifecycleState match {
case Active | PreSubscriber =>
lifecycleState = ErrorEmitted(cause, stop = false)
if (subscriber ne null) // otherwise onError will be called when the subscription arrives
try tryOnError(subscriber, cause)
finally subscriber = null
case _: ErrorEmitted =>
throw new IllegalStateException("onError must only be called once")
case Completed | CompleteThenStop =>
throw new IllegalStateException("onError must not be called after onComplete")
case Canceled => // drop
}
/**
* Terminate the stream with failure. After that you are not allowed to
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*
* After signaling the Error the Actor will then stop itself as it has completed the protocol.
* When [[#onError]] is called before any [[Subscriber]] has had the chance to subscribe
* to this [[ActorPublisher]] the error signal (and therefore stopping of the Actor as well)
* will be delayed until such [[Subscriber]] arrives.
*/
def onErrorThenStop(cause: Throwable): Unit = lifecycleState match {
case Active | PreSubscriber =>
lifecycleState = ErrorEmitted(cause, stop = true)
if (subscriber ne null) // otherwise onError will be called when the subscription arrives
try tryOnError(subscriber, cause)
finally context.stop(self)
case _ => onError(cause)
}
/**
* INTERNAL API
*/
protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match {
case req @ Request(n) =>
if (req.isProcessed()) {
// it's an unstashed Request, demand is already handled
super.aroundReceive(receive, req)
} else {
if (n < 1) {
if (lifecycleState == Active)
onError(numberOfElementsInRequestMustBePositiveException)
} else {
demand += n
if (demand < 0)
demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
req.markProcessed()
super.aroundReceive(receive, req)
}
}
case Subscribe(sub: Subscriber[_]) =>
lifecycleState match {
case PreSubscriber =>
scheduledSubscriptionTimeout.cancel()
subscriber = sub
lifecycleState = Active
tryOnSubscribe(sub, new ActorPublisherSubscription(self))
case ErrorEmitted(cause, stop) =>
if (stop) context.stop(self)
tryOnSubscribe(sub, CancelledSubscription)
tryOnError(sub, cause)
case Completed =>
tryOnSubscribe(sub, CancelledSubscription)
tryOnComplete(sub)
case CompleteThenStop =>
context.stop(self)
tryOnSubscribe(sub, CancelledSubscription)
tryOnComplete(sub)
case Active | Canceled =>
if (subscriber eq sub)
rejectDuplicateSubscriber(sub)
else
rejectAdditionalSubscriber(sub, "ActorPublisher")
}
case Cancel =>
if (lifecycleState != Canceled) {
// possible to receive again in case of stash
cancelSelf()
super.aroundReceive(receive, msg)
}
case SubscriptionTimeoutExceeded =>
if (!scheduledSubscriptionTimeout.isCancelled) {
cancelSelf()
super.aroundReceive(receive, msg)
}
case _ =>
super.aroundReceive(receive, msg)
}
private def cancelSelf(): Unit = {
lifecycleState = Canceled
demand = 0
subscriber = null
}
/**
* INTERNAL API
*/
override protected[akka] def aroundPreStart(): Unit = {
super.aroundPreStart()
import context.dispatcher
subscriptionTimeout match {
case timeout: FiniteDuration =>
scheduledSubscriptionTimeout = context.system.scheduler.scheduleOnce(timeout, self, SubscriptionTimeoutExceeded)
case _ =>
// ignore...
}
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
// some state must survive restart
state.set(self, ActorPublisherState.State(Option(subscriber), demand, lifecycleState))
super.aroundPreRestart(reason, message)
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPostRestart(reason: Throwable): Unit = {
state.get(self).foreach { s =>
// restore previous state
subscriber = s.subscriber.orNull
demand = s.demand
lifecycleState = s.lifecycleState
}
state.remove(self)
super.aroundPostRestart(reason)
}
/**
* INTERNAL API
*/
protected[akka] override def aroundPostStop(): Unit = {
state.remove(self)
try if (lifecycleState == Active) tryOnComplete(subscriber)
finally super.aroundPostStop()
}
}
/**
* INTERNAL API
*/
@silent("deprecated")
private[akka] final case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[T] {
import ActorPublisher.Internal._
override def subscribe(sub: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(sub)
ref ! Subscribe(sub.asInstanceOf[Subscriber[Any]])
}
}
/**
* INTERNAL API
*/
private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscription {
import ActorPublisherMessage._
override def request(n: Long): Unit = ref ! Request(n)
override def cancel(): Unit = ref ! Cancel
}
/**
* INTERNAL API
* Some state must survive restarts.
*/
private[akka] object ActorPublisherState extends ExtensionId[ActorPublisherState] with ExtensionIdProvider {
import ActorPublisher.Internal.LifecycleState
override def get(system: ActorSystem): ActorPublisherState = super.get(system)
override def lookup() = ActorPublisherState
override def createExtension(system: ExtendedActorSystem): ActorPublisherState =
new ActorPublisherState
final case class State(subscriber: Option[Subscriber[Any]], demand: Long, lifecycleState: LifecycleState)
}
/**
* INTERNAL API
*/
private[akka] class ActorPublisherState extends Extension {
import ActorPublisherState.State
private val state = new ConcurrentHashMap[ActorRef, State]
def get(ref: ActorRef): Option[State] = Option(state.get(ref))
def set(ref: ActorRef, s: State): Unit = state.put(ref, s)
def remove(ref: ActorRef): Unit = state.remove(ref)
}

View file

@ -1,343 +1,351 @@
/* /*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com> * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/ */
package akka.stream.actor ///*
// * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
import java.util.concurrent.ConcurrentHashMap // */
import org.reactivestreams.{ Subscriber, Subscription } //
import akka.actor._ ///*
import akka.stream.impl.ReactiveStreamsCompliance // * Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
// */
object ActorSubscriber { //
//package akka.stream.actor
/** //
* Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]] //import java.util.concurrent.ConcurrentHashMap
* to a [[org.reactivestreams.Publisher]] or [[akka.stream.scaladsl.Flow]]. //import org.reactivestreams.{ Subscriber, Subscription }
*/ //import akka.actor._
def apply[T](ref: ActorRef): Subscriber[T] = new ActorSubscriberImpl(ref) //import akka.stream.impl.ReactiveStreamsCompliance
//
/** //object ActorSubscriber {
* INTERNAL API //
*/ // /**
private[akka] final case class OnSubscribe(subscription: Subscription) // * Attach a [[ActorSubscriber]] actor as a [[org.reactivestreams.Subscriber]]
extends DeadLetterSuppression // * to a [[org.reactivestreams.Publisher]] or [[akka.stream.scaladsl.Flow]].
with NoSerializationVerificationNeeded // */
// def apply[T](ref: ActorRef): Subscriber[T] = new ActorSubscriberImpl(ref)
} //
// /**
sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression with NoSerializationVerificationNeeded // * INTERNAL API
// */
object ActorSubscriberMessage { // private[akka] final case class OnSubscribe(subscription: Subscription)
final case class OnNext(element: Any) extends ActorSubscriberMessage // extends DeadLetterSuppression
final case class OnError(cause: Throwable) extends ActorSubscriberMessage // with NoSerializationVerificationNeeded
case object OnComplete extends ActorSubscriberMessage //
//}
/** //
* Java API: get the singleton instance of the `OnComplete` message //sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression with NoSerializationVerificationNeeded
*/ //
def onCompleteInstance = OnComplete //object ActorSubscriberMessage {
} // final case class OnNext(element: Any) extends ActorSubscriberMessage
// final case class OnError(cause: Throwable) extends ActorSubscriberMessage
/** // case object OnComplete extends ActorSubscriberMessage
* An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure. //
*/ // /**
trait RequestStrategy { // * Java API: get the singleton instance of the `OnComplete` message
// */
/** // def onCompleteInstance = OnComplete
* Invoked by the [[ActorSubscriber]] after each incoming message to //}
* determine how many more elements to request from the stream. //
* ///**
* @param remainingRequested current remaining number of elements that // * An [[ActorSubscriber]] defines a `RequestStrategy` to control the stream back pressure.
* have been requested from upstream but not received yet // */
* @return demand of more elements from the stream, returning 0 means that no //trait RequestStrategy {
* more elements will be requested for now //
*/ // /**
def requestDemand(remainingRequested: Int): Int // * Invoked by the [[ActorSubscriber]] after each incoming message to
} // * determine how many more elements to request from the stream.
// *
/** // * @param remainingRequested current remaining number of elements that
* Requests one more element when `remainingRequested` is 0, i.e. // * have been requested from upstream but not received yet
* max one element in flight. // * @return demand of more elements from the stream, returning 0 means that no
*/ // * more elements will be requested for now
case object OneByOneRequestStrategy extends RequestStrategy { // */
def requestDemand(remainingRequested: Int): Int = // def requestDemand(remainingRequested: Int): Int
if (remainingRequested == 0) 1 else 0 //}
//
/** ///**
* Java API: get the singleton instance // * Requests one more element when `remainingRequested` is 0, i.e.
*/ // * max one element in flight.
def getInstance = this // */
} //case object OneByOneRequestStrategy extends RequestStrategy {
// def requestDemand(remainingRequested: Int): Int =
/** // if (remainingRequested == 0) 1 else 0
* When request is only controlled with manual calls to //
* [[ActorSubscriber#request]]. // /**
*/ // * Java API: get the singleton instance
case object ZeroRequestStrategy extends RequestStrategy { // */
def requestDemand(remainingRequested: Int): Int = 0 // def getInstance = this
//}
/** //
* Java API: get the singleton instance ///**
*/ // * When request is only controlled with manual calls to
def getInstance = this // * [[ActorSubscriber#request]].
} // */
//case object ZeroRequestStrategy extends RequestStrategy {
object WatermarkRequestStrategy { // def requestDemand(remainingRequested: Int): Int = 0
//
/** // /**
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of // * Java API: get the singleton instance
* the specified `highWatermark`. // */
*/ // def getInstance = this
def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark) //}
} //
//object WatermarkRequestStrategy {
/** //
* Requests up to the `highWatermark` when the `remainingRequested` is // /**
* below the `lowWatermark`. This a good strategy when the actor performs work itself. // * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
*/ // * the specified `highWatermark`.
final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { // */
require(lowWatermark >= 0, "lowWatermark must be >= 0") // def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark)
require(highWatermark >= lowWatermark, "highWatermark must be >= lowWatermark") //}
//
/** ///**
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of // * Requests up to the `highWatermark` when the `remainingRequested` is
* the specified `highWatermark`. // * below the `lowWatermark`. This a good strategy when the actor performs work itself.
*/ // */
def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2)) //final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy {
// require(lowWatermark >= 0, "lowWatermark must be >= 0")
def requestDemand(remainingRequested: Int): Int = // require(highWatermark >= lowWatermark, "highWatermark must be >= lowWatermark")
if (remainingRequested < lowWatermark) //
highWatermark - remainingRequested // /**
else 0 // * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
} // * the specified `highWatermark`.
// */
/** // def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2))
* Requests up to the `max` and also takes the number of messages //
* that have been queued internally or delegated to other actors into account. // def requestDemand(remainingRequested: Int): Int =
* Concrete subclass must implement [[#inFlightInternally]]. // if (remainingRequested < lowWatermark)
* It will request elements in minimum batches of the defined [[#batchSize]]. // highWatermark - remainingRequested
*/ // else 0
abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy { //}
//
/** ///**
* Concrete subclass must implement this method to define how many // * Requests up to the `max` and also takes the number of messages
* messages that are currently in progress or queued. // * that have been queued internally or delegated to other actors into account.
*/ // * Concrete subclass must implement [[#inFlightInternally]].
def inFlightInternally: Int // * It will request elements in minimum batches of the defined [[#batchSize]].
// */
/** //abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy {
* Elements will be requested in minimum batches of this size. //
* Default is 5. Subclass may override to define the batch size. // /**
*/ // * Concrete subclass must implement this method to define how many
def batchSize: Int = 5 // * messages that are currently in progress or queued.
// */
override def requestDemand(remainingRequested: Int): Int = { // def inFlightInternally: Int
val batch = math.min(batchSize, max) //
if ((remainingRequested + inFlightInternally) <= (max - batch)) // /**
math.max(0, max - remainingRequested - inFlightInternally) // * Elements will be requested in minimum batches of this size.
else 0 // * Default is 5. Subclass may override to define the batch size.
} // */
} // def batchSize: Int = 5
//
/** // override def requestDemand(remainingRequested: Int): Int = {
* Extend/mixin this trait in your [[akka.actor.Actor]] to make it a // val batch = math.min(batchSize, max)
* stream subscriber with full control of stream back pressure. It will receive // if ((remainingRequested + inFlightInternally) <= (max - batch))
* [[ActorSubscriberMessage.OnNext]], [[ActorSubscriberMessage.OnComplete]] and [[ActorSubscriberMessage.OnError]] // math.max(0, max - remainingRequested - inFlightInternally)
* messages from the stream. It can also receive other, non-stream messages, in // else 0
* the same way as any actor. // }
* //}
* Attach the actor as a [[org.reactivestreams.Subscriber]] to the stream with //
* Scala API [[ActorSubscriber#apply]], or Java API [[ClassicActorSubscriber#create]] or ///**
* Java API compatible with lambda expressions [[ClassicActorSubscriber#create]]. // * Extend/mixin this trait in your [[akka.actor.Actor]] to make it a
* // * stream subscriber with full control of stream back pressure. It will receive
* Subclass must define the [[RequestStrategy]] to control stream back pressure. // * [[ActorSubscriberMessage.OnNext]], [[ActorSubscriberMessage.OnComplete]] and [[ActorSubscriberMessage.OnError]]
* After each incoming message the `ActorSubscriber` will automatically invoke // * messages from the stream. It can also receive other, non-stream messages, in
* the [[RequestStrategy#requestDemand]] and propagate the returned demand to the stream. // * the same way as any actor.
* The provided [[WatermarkRequestStrategy]] is a good strategy if the actor // *
* performs work itself. // * Attach the actor as a [[org.reactivestreams.Subscriber]] to the stream with
* The provided [[MaxInFlightRequestStrategy]] is useful if messages are // * Scala API [[ActorSubscriber#apply]], or Java API [[ClassicActorSubscriber#create]] or
* queued internally or delegated to other actors. // * Java API compatible with lambda expressions [[ClassicActorSubscriber#create]].
* You can also implement a custom [[RequestStrategy]] or call [[#request]] manually // *
* together with [[ZeroRequestStrategy]] or some other strategy. In that case // * Subclass must define the [[RequestStrategy]] to control stream back pressure.
* you must also call [[#request]] when the actor is started or when it is ready, otherwise // * After each incoming message the `ActorSubscriber` will automatically invoke
* it will not receive any elements. // * the [[RequestStrategy#requestDemand]] and propagate the returned demand to the stream.
* // * The provided [[WatermarkRequestStrategy]] is a good strategy if the actor
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. // * performs work itself.
*/ // * The provided [[MaxInFlightRequestStrategy]] is useful if messages are
@deprecated( // * queued internally or delegated to other actors.
"Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", // * You can also implement a custom [[RequestStrategy]] or call [[#request]] manually
since = "2.5.0") // * together with [[ZeroRequestStrategy]] or some other strategy. In that case
trait ActorSubscriber extends Actor { // * you must also call [[#request]] when the actor is started or when it is ready, otherwise
import ActorSubscriber._ // * it will not receive any elements.
import ActorSubscriberMessage._ // *
// * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
private[this] val state = ActorSubscriberState(context.system) // */
private[this] var subscription: Option[Subscription] = None //@deprecated(
private[this] var requested: Long = 0 // "Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
private[this] var _canceled = false // since = "2.5.0")
//trait ActorSubscriber extends Actor {
protected def requestStrategy: RequestStrategy // import ActorSubscriber._
// import ActorSubscriberMessage._
final def canceled: Boolean = _canceled //
// private[this] val state = ActorSubscriberState(context.system)
/** // private[this] var subscription: Option[Subscription] = None
* INTERNAL API // private[this] var requested: Long = 0
*/ // private[this] var _canceled = false
protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match { //
case _: OnNext => // protected def requestStrategy: RequestStrategy
requested -= 1 //
if (!_canceled) { // final def canceled: Boolean = _canceled
super.aroundReceive(receive, msg) //
request(requestStrategy.requestDemand(remainingRequested)) // /**
} // * INTERNAL API
case OnSubscribe(sub) => // */
if (subscription.isEmpty) { // protected[akka] override def aroundReceive(receive: Receive, msg: Any): Unit = msg match {
subscription = Some(sub) // case _: OnNext =>
if (_canceled) { // requested -= 1
context.stop(self) // if (!_canceled) {
sub.cancel() // super.aroundReceive(receive, msg)
} else if (requested != 0) // request(requestStrategy.requestDemand(remainingRequested))
sub.request(remainingRequested) // }
} else // case OnSubscribe(sub) =>
sub.cancel() // if (subscription.isEmpty) {
case OnComplete | OnError(_) => // subscription = Some(sub)
if (!_canceled) { // if (_canceled) {
_canceled = true // context.stop(self)
super.aroundReceive(receive, msg) // sub.cancel()
} // } else if (requested != 0)
case _ => // sub.request(remainingRequested)
super.aroundReceive(receive, msg) // } else
request(requestStrategy.requestDemand(remainingRequested)) // sub.cancel()
} // case OnComplete | OnError(_) =>
// if (!_canceled) {
/** // _canceled = true
* INTERNAL API // super.aroundReceive(receive, msg)
*/ // }
protected[akka] override def aroundPreStart(): Unit = { // case _ =>
super.aroundPreStart() // super.aroundReceive(receive, msg)
request(requestStrategy.requestDemand(remainingRequested)) // request(requestStrategy.requestDemand(remainingRequested))
} // }
//
/** // /**
* INTERNAL API // * INTERNAL API
*/ // */
protected[akka] override def aroundPostRestart(reason: Throwable): Unit = { // protected[akka] override def aroundPreStart(): Unit = {
state.get(self).foreach { s => // super.aroundPreStart()
// restore previous state // request(requestStrategy.requestDemand(remainingRequested))
subscription = s.subscription // }
requested = s.requested //
_canceled = s.canceled // /**
} // * INTERNAL API
state.remove(self) // */
super.aroundPostRestart(reason) // protected[akka] override def aroundPostRestart(reason: Throwable): Unit = {
request(requestStrategy.requestDemand(remainingRequested)) // state.get(self).foreach { s =>
} // // restore previous state
// subscription = s.subscription
/** // requested = s.requested
* INTERNAL API // _canceled = s.canceled
*/ // }
protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { // state.remove(self)
// some state must survive restart // super.aroundPostRestart(reason)
state.set(self, ActorSubscriberState.State(subscription, requested, _canceled)) // request(requestStrategy.requestDemand(remainingRequested))
super.aroundPreRestart(reason, message) // }
} //
// /**
/** // * INTERNAL API
* INTERNAL API // */
*/ // protected[akka] override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
protected[akka] override def aroundPostStop(): Unit = { // // some state must survive restart
state.remove(self) // state.set(self, ActorSubscriberState.State(subscription, requested, _canceled))
if (!_canceled) subscription.foreach(_.cancel()) // super.aroundPreRestart(reason, message)
super.aroundPostStop() // }
} //
// /**
/** // * INTERNAL API
* Request a number of elements from upstream. // */
*/ // protected[akka] override def aroundPostStop(): Unit = {
protected def request(elements: Long): Unit = // state.remove(self)
if (elements > 0 && !_canceled) { // if (!_canceled) subscription.foreach(_.cancel())
// if we don't have a subscription yet, it will be requested when it arrives // super.aroundPostStop()
subscription.foreach(_.request(elements)) // }
requested += elements //
} // /**
// * Request a number of elements from upstream.
/** // */
* Cancel upstream subscription. // protected def request(elements: Long): Unit =
* No more elements will be delivered after cancel. // if (elements > 0 && !_canceled) {
* // // if we don't have a subscription yet, it will be requested when it arrives
* The [[ActorSubscriber]] will be stopped immediately after signaling cancellation. // subscription.foreach(_.request(elements))
* In case the upstream subscription has not yet arrived the Actor will stay alive // requested += elements
* until a subscription arrives, cancel it and then stop itself. // }
*/ //
protected def cancel(): Unit = // /**
if (!_canceled) { // * Cancel upstream subscription.
subscription match { // * No more elements will be delivered after cancel.
case Some(s) => // *
context.stop(self) // * The [[ActorSubscriber]] will be stopped immediately after signaling cancellation.
s.cancel() // * In case the upstream subscription has not yet arrived the Actor will stay alive
case _ => // * until a subscription arrives, cancel it and then stop itself.
_canceled = true // cancel will be signaled once a subscription arrives // */
} // protected def cancel(): Unit =
} // if (!_canceled) {
// subscription match {
/** // case Some(s) =>
* The number of stream elements that have already been requested from upstream // context.stop(self)
* but not yet received. // s.cancel()
*/ // case _ =>
protected def remainingRequested: Int = longToIntMax(requested) // _canceled = true // cancel will be signaled once a subscription arrives
// }
private def longToIntMax(n: Long): Int = // }
if (n > Int.MaxValue) Int.MaxValue //
else n.toInt // /**
} // * The number of stream elements that have already been requested from upstream
// * but not yet received.
/** // */
* INTERNAL API // protected def remainingRequested: Int = longToIntMax(requested)
*/ //
private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] { // private def longToIntMax(n: Long): Int =
import ActorSubscriberMessage._ // if (n > Int.MaxValue) Int.MaxValue
override def onError(cause: Throwable): Unit = { // else n.toInt
ReactiveStreamsCompliance.requireNonNullException(cause) //}
impl ! OnError(cause) //
} ///**
override def onComplete(): Unit = impl ! OnComplete // * INTERNAL API
override def onNext(element: T): Unit = { // */
ReactiveStreamsCompliance.requireNonNullElement(element) //private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] {
impl ! OnNext(element) // import ActorSubscriberMessage._
} // override def onError(cause: Throwable): Unit = {
override def onSubscribe(subscription: Subscription): Unit = { // ReactiveStreamsCompliance.requireNonNullException(cause)
ReactiveStreamsCompliance.requireNonNullSubscription(subscription) // impl ! OnError(cause)
impl ! ActorSubscriber.OnSubscribe(subscription) // }
} // override def onComplete(): Unit = impl ! OnComplete
} // override def onNext(element: T): Unit = {
// ReactiveStreamsCompliance.requireNonNullElement(element)
/** // impl ! OnNext(element)
* INTERNAL API // }
* Some state must survive restarts. // override def onSubscribe(subscription: Subscription): Unit = {
*/ // ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider { // impl ! ActorSubscriber.OnSubscribe(subscription)
override def get(system: ActorSystem): ActorSubscriberState = super.get(system) // }
//}
override def lookup() = ActorSubscriberState //
///**
override def createExtension(system: ExtendedActorSystem): ActorSubscriberState = // * INTERNAL API
new ActorSubscriberState // * Some state must survive restarts.
// */
final case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean) //private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider {
// override def get(system: ActorSystem): ActorSubscriberState = super.get(system)
} //
// override def lookup() = ActorSubscriberState
/** //
* INTERNAL API // override def createExtension(system: ExtendedActorSystem): ActorSubscriberState =
*/ // new ActorSubscriberState
private[akka] class ActorSubscriberState extends Extension { //
import ActorSubscriberState.State // final case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean)
private val state = new ConcurrentHashMap[ActorRef, State] //
//}
def get(ref: ActorRef): Option[State] = Option(state.get(ref)) //
///**
def set(ref: ActorRef, s: State): Unit = state.put(ref, s) // * INTERNAL API
// */
def remove(ref: ActorRef): Unit = state.remove(ref) //private[akka] class ActorSubscriberState extends Extension {
} // import ActorSubscriberState.State
// private val state = new ConcurrentHashMap[ActorRef, State]
//
// def get(ref: ActorRef): Option[State] = Option(state.get(ref))
//
// def set(ref: ActorRef, s: State): Unit = state.put(ref, s)
//
// def remove(ref: ActorRef): Unit = state.remove(ref)
//}

View file

@ -7,8 +7,7 @@ package akka.stream.impl
import akka.actor._ import akka.actor._
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream.{ AbruptTerminationException, Attributes } import akka.stream.{ AbruptTerminationException, Attributes }
import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext, OnSubscribe }
import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext }
import org.reactivestreams.{ Processor, Subscriber, Subscription } import org.reactivestreams.{ Processor, Subscriber, Subscription }
import akka.event.Logging import akka.event.Logging
import akka.stream.ActorAttributes import akka.stream.ActorAttributes

View file

@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.actor.{ Actor, ActorRef, Terminated } import akka.actor.{ Actor, ActorRef, Terminated }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }

View file

@ -0,0 +1,33 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.DeadLetterSuppression
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.InternalApi
import org.reactivestreams.Subscription
/**
* INTERNAL API
*/
@InternalApi private[akka] sealed abstract class ActorSubscriberMessage
extends DeadLetterSuppression
with NoSerializationVerificationNeeded
/**
* INTERNAL API
*/
@InternalApi private[akka] object ActorSubscriberMessage {
final case class OnNext(element: Any) extends ActorSubscriberMessage
final case class OnError(cause: Throwable) extends ActorSubscriberMessage
case object OnComplete extends ActorSubscriberMessage
// OnSubscribe doesn't extend ActorSubscriberMessage by design, because `OnNext`, `OnError` and `OnComplete`
// are used together, with the same `seal`, but not always `OnSubscribe`.
final case class OnSubscribe(subscription: Subscription)
extends DeadLetterSuppression
with NoSerializationVerificationNeeded
}

View file

@ -9,7 +9,6 @@ import akka.annotation.{ DoNotInherit, InternalApi }
import akka.stream.ActorAttributes import akka.stream.ActorAttributes
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.AbruptTerminationException import akka.stream.AbruptTerminationException
import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage }
import akka.util.unused import akka.util.unused
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
@ -236,7 +235,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
def subreceive: SubReceive = def subreceive: SubReceive =
new SubReceive({ new SubReceive({
case OnSubscribe(id, subscription) => case OnSubscribe(id, subscription) =>
inputs(id).subreceive(ActorSubscriber.OnSubscribe(subscription)) inputs(id).subreceive(ActorSubscriberMessage.OnSubscribe(subscription))
case OnNext(id, elem) => case OnNext(id, elem) =>
if (marked(id) && !pending(id)) markedPending += 1 if (marked(id) && !pending(id)) markedPending += 1
pending(id, on = true) pending(id, on = true)

View file

@ -5,7 +5,6 @@
package akka.stream.impl package akka.stream.impl
import akka.NotUsed import akka.NotUsed
import akka.actor._
import akka.annotation.{ DoNotInherit, InternalApi } import akka.annotation.{ DoNotInherit, InternalApi }
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.StreamLayout.AtomicModule
@ -13,7 +12,6 @@ import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import akka.event.Logging import akka.event.Logging
import com.github.ghik.silencer.silent
/** /**
* INTERNAL API * INTERNAL API
@ -87,26 +85,3 @@ import com.github.ghik.silencer.silent
override def withAttributes(attr: Attributes): SourceModule[Out, NotUsed] = override def withAttributes(attr: Attributes): SourceModule[Out, NotUsed] =
new PublisherSource[Out](p, attr, amendShape(attr)) new PublisherSource[Out](p, attr, amendShape(attr))
} }
/**
* INTERNAL API
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
*/
@InternalApi private[akka] final class ActorPublisherSource[Out](
props: Props,
val attributes: Attributes,
shape: SourceShape[Out])
extends SourceModule[Out, ActorRef](shape) {
@silent("deprecated")
override def create(context: MaterializationContext) = {
val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
(akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef)
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorPublisherSource[Out](props, attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[Out, ActorRef] =
new ActorPublisherSource(props, attr, amendShape(attr))
}

View file

@ -7,8 +7,6 @@ package akka.stream.impl
import java.util.function.BinaryOperator import java.util.function.BinaryOperator
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.Props
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
@ -165,28 +163,6 @@ import scala.util.control.NonFatal
override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr)) override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr))
} }
/**
* INTERNAL API
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
@InternalApi private[akka] final class ActorSubscriberSink[In](
props: Props,
val attributes: Attributes,
shape: SinkShape[In])
extends SinkModule[In, ActorRef](shape) {
override def create(context: MaterializationContext) = {
val subscriberRef = context.materializer.actorOf(context, props)
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] =
new ActorSubscriberSink[In](props, attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, ActorRef] =
new ActorSubscriberSink[In](props, attr, amendShape(attr))
}
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -12,7 +12,7 @@ import akka.annotation.InternalApi
import akka.stream.ActorAttributes.StreamSubscriptionTimeout import akka.stream.ActorAttributes.StreamSubscriptionTimeout
import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream._ import akka.stream._
import akka.stream.actor.ActorSubscriberMessage import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.impl.TraversalBuilder import akka.stream.impl.TraversalBuilder

View file

@ -9,7 +9,6 @@ import akka.actor.{ ActorRef, Terminated }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.event.Logging import akka.event.Logging
import akka.stream._ import akka.stream._
import akka.stream.actor.{ RequestStrategy, WatermarkRequestStrategy }
import akka.stream.impl.FixedSizeBuffer import akka.stream.impl.FixedSizeBuffer
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.stream.stage._ import akka.stream.stage._
@ -28,6 +27,43 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
*/ */
@InternalApi private[stream] object SourceRefStageImpl { @InternalApi private[stream] object SourceRefStageImpl {
private sealed trait ActorRefStage { def ref: ActorRef } private sealed trait ActorRefStage { def ref: ActorRef }
object WatermarkRequestStrategy {
/**
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
* the specified `highWatermark`.
*/
def apply(highWatermark: Int): WatermarkRequestStrategy = new WatermarkRequestStrategy(highWatermark)
}
/**
* Requests up to the `highWatermark` when the `remainingRequested` is
* below the `lowWatermark`. This a good strategy when the actor performs work itself.
*/
final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) {
require(lowWatermark >= 0, "lowWatermark must be >= 0")
require(highWatermark >= lowWatermark, "highWatermark must be >= lowWatermark")
/**
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
* the specified `highWatermark`.
*/
def this(highWatermark: Int) = this(highWatermark, lowWatermark = math.max(1, highWatermark / 2))
/**
* Invoked after each incoming message to determine how many more elements to request from the stream.
*
* @param remainingRequested current remaining number of elements that
* have been requested from upstream but not received yet
* @return demand of more elements from the stream, returning 0 means that no
* more elements will be requested for now
*/
def requestDemand(remainingRequested: Int): Int =
if (remainingRequested < lowWatermark)
highWatermark - remainingRequested
else 0
}
} }
/** /**
@ -40,6 +76,7 @@ private[stream] final case class SourceRefImpl[T](initialPartnerRef: ActorRef) e
private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef]) private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: OptionVal[ActorRef])
extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage => extends GraphStageWithMaterializedValue[SourceShape[Out], SinkRef[Out]] { stage =>
import SourceRefStageImpl.ActorRefStage import SourceRefStageImpl.ActorRefStage
import SourceRefStageImpl.WatermarkRequestStrategy
val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out") val out: Outlet[Out] = Outlet[Out](s"${Logging.simpleName(getClass)}.out")
override def shape = SourceShape.of(out) override def shape = SourceShape.of(out)
@ -106,7 +143,8 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio
private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity) private val receiveBuffer = FixedSizeBuffer[Out](bufferCapacity)
private val requestStrategy: RequestStrategy = WatermarkRequestStrategy(highWatermark = receiveBuffer.capacity) private val requestStrategy: WatermarkRequestStrategy = WatermarkRequestStrategy(
highWatermark = receiveBuffer.capacity)
// end of demand management --- // end of demand management ---
// initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin") // initialized with the originRef if present, that means we're the "remote" for an already active Source on the other side (the "origin")

View file

@ -12,7 +12,7 @@ import akka.annotation.InternalApi
import akka.japi.function.{ Effect, Procedure } import akka.japi.function.{ Effect, Procedure }
import akka.pattern.ask import akka.pattern.ask
import akka.stream._ import akka.stream._
import akka.stream.actor.ActorSubscriberMessage import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource }
import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder } import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder }
import akka.stream.scaladsl.GenericGraphWithChangedAttributes import akka.stream.scaladsl.GenericGraphWithChangedAttributes