=str deprecate ActorPublisher/Subscriber, use GraphStage (#21952)
* =str deprecate ActorPublisher/Subscriber, use GraphStage * =str deprecate Source.actorPublisher / Sink.actorSubscriber * =str added deprecation note of ActorPublisher,Subscriber
This commit is contained in:
parent
591eafe04c
commit
2ea8cd7410
9 changed files with 90 additions and 0 deletions
|
|
@ -437,6 +437,14 @@ These can be consumed by other Reactive Stream libraries or used as an Akka Stre
|
||||||
ActorPublisher
|
ActorPublisher
|
||||||
--------------
|
--------------
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
**Deprecation warning:** ``ActorPublisher`` is deprecated in favour of the vastly more
|
||||||
|
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
|
||||||
|
expose a "stage actor ref" is needed to be addressed as-if an Actor.
|
||||||
|
Custom stages implemented using ``GraphStage`` are also automatically fusable.
|
||||||
|
|
||||||
|
To learn more about implementing custom stages using it refer to :ref:`graphstage-java`.
|
||||||
|
|
||||||
Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a
|
Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a
|
||||||
stream publisher that keeps track of the subscription life cycle and requested elements.
|
stream publisher that keeps track of the subscription life cycle and requested elements.
|
||||||
|
|
||||||
|
|
@ -482,6 +490,14 @@ attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscr
|
||||||
ActorSubscriber
|
ActorSubscriber
|
||||||
---------------
|
---------------
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
**Deprecation warning:** ``ActorSubscriber`` is deprecated in favour of the vastly more
|
||||||
|
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
|
||||||
|
expose a "stage actor ref" is needed to be addressed as-if an Actor.
|
||||||
|
Custom stages implemented using ``GraphStage`` are also automatically fusable.
|
||||||
|
|
||||||
|
To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`.
|
||||||
|
|
||||||
Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with
|
Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with
|
||||||
full control of stream back pressure. It will receive
|
full control of stream back pressure. It will receive
|
||||||
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
|
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,23 @@ would now be::
|
||||||
|
|
||||||
as the ``GraphStage`` itself is a factory of logic instances.
|
as the ``GraphStage`` itself is a factory of logic instances.
|
||||||
|
|
||||||
|
Deprecation of ActorSubscriber and ActorPublisher
|
||||||
|
-------------------------------------------------
|
||||||
|
|
||||||
|
The classes ``ActorPublisher`` and ``ActorSubscriber`` were the first user-facing Reactive Streams integration
|
||||||
|
API that we provided for end-users. Akka Streams APIs have evolved and improved a lot since then, and now
|
||||||
|
there is no need to use these low-level abstractions anymore. It is easy to get things wrong when implementing them,
|
||||||
|
and one would have to validate each implementation of such Actor using the Reactive Streams Technology Compatibility Kit.
|
||||||
|
|
||||||
|
The replacement API is the powerful ``GraphStage``. It has all features that raw Actors provided for implementing Stream
|
||||||
|
stages and adds additional protocol and type-safety. You can learn all about it in the documentation:
|
||||||
|
:ref:`stream-customize-scala`and :ref:`Custom stream processing in JavaDSL <stream-customize-java>`.
|
||||||
|
|
||||||
|
You should also read the blog post series on the official team blog, starting with `Mastering GraphStages, part I`_,
|
||||||
|
which explains using and implementing GraphStages in more practical terms than the reference documentation.
|
||||||
|
|
||||||
|
.. _Mastering GraphStages, part I: http://blog.akka.io/streams/2016/07/30/mastering-graph-stage-part-1
|
||||||
|
|
||||||
Agents
|
Agents
|
||||||
======
|
======
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -437,6 +437,14 @@ These can be consumed by other Reactive Stream libraries or used as an Akka Stre
|
||||||
ActorPublisher
|
ActorPublisher
|
||||||
--------------
|
--------------
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
**Deprecation warning:** ``ActorPublisher`` is deprecated in favour of the vastly more
|
||||||
|
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
|
||||||
|
expose a "stage actor ref" is needed to be addressed as-if an Actor.
|
||||||
|
Custom stages implemented using ``GraphStage`` are also automatically fusable.
|
||||||
|
|
||||||
|
To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`.
|
||||||
|
|
||||||
Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a
|
Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a
|
||||||
stream publisher that keeps track of the subscription life cycle and requested elements.
|
stream publisher that keeps track of the subscription life cycle and requested elements.
|
||||||
|
|
||||||
|
|
@ -482,6 +490,14 @@ subscription attempts will be rejected with an :class:`IllegalStateException`.
|
||||||
ActorSubscriber
|
ActorSubscriber
|
||||||
---------------
|
---------------
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
**Deprecation warning:** ``ActorSubscriber`` is deprecated in favour of the vastly more
|
||||||
|
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
|
||||||
|
expose a "stage actor ref" is needed to be addressed as-if an Actor.
|
||||||
|
Custom stages implemented using ``GraphStage`` are also automatically fusable.
|
||||||
|
|
||||||
|
To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`.
|
||||||
|
|
||||||
Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a
|
Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a
|
||||||
stream subscriber with full control of stream back pressure. It will receive
|
stream subscriber with full control of stream back pressure. It will receive
|
||||||
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
|
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import concurrent.duration.FiniteDuration
|
||||||
import akka.stream.impl.CancelledSubscription
|
import akka.stream.impl.CancelledSubscription
|
||||||
import akka.stream.impl.ReactiveStreamsCompliance._
|
import akka.stream.impl.ReactiveStreamsCompliance._
|
||||||
|
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
object ActorPublisher {
|
object ActorPublisher {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -120,7 +121,10 @@ object ActorPublisherMessage {
|
||||||
*
|
*
|
||||||
* If the actor is stopped the stream will be completed, unless it was not already terminated with
|
* If the actor is stopped the stream will be completed, unless it was not already terminated with
|
||||||
* failure, completed or canceled.
|
* failure, completed or canceled.
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
trait ActorPublisher[T] extends Actor {
|
trait ActorPublisher[T] extends Actor {
|
||||||
import ActorPublisher.Internal._
|
import ActorPublisher.Internal._
|
||||||
import ActorPublisherMessage._
|
import ActorPublisherMessage._
|
||||||
|
|
@ -450,6 +454,7 @@ object UntypedActorPublisher {
|
||||||
* Java API
|
* Java API
|
||||||
* @see [[akka.stream.actor.ActorPublisher]]
|
* @see [[akka.stream.actor.ActorPublisher]]
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T]
|
abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -467,26 +472,38 @@ object AbstractActorPublisher {
|
||||||
/**
|
/**
|
||||||
* Java API compatible with lambda expressions
|
* Java API compatible with lambda expressions
|
||||||
* @see [[akka.stream.actor.ActorPublisher]]
|
* @see [[akka.stream.actor.ActorPublisher]]
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T]
|
abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API compatible with lambda expressions.
|
* Java API compatible with lambda expressions.
|
||||||
* This class adds a Stash to {@link AbstractActorPublisher}.
|
* This class adds a Stash to {@link AbstractActorPublisher}.
|
||||||
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithStash]]
|
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithStash]]
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
abstract class AbstractActorPublisherWithStash[T] extends AbstractActor with ActorPublisher[T] with Stash
|
abstract class AbstractActorPublisherWithStash[T] extends AbstractActor with ActorPublisher[T] with Stash
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API compatible with lambda expressions.
|
* Java API compatible with lambda expressions.
|
||||||
* This class adds an unbounded Stash to {@link AbstractActorPublisher}.
|
* This class adds an unbounded Stash to {@link AbstractActorPublisher}.
|
||||||
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnboundedStash]]
|
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnboundedStash]]
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
abstract class AbstractActorPublisherWithUnboundedStash[T] extends AbstractActor with ActorPublisher[T] with UnboundedStash
|
abstract class AbstractActorPublisherWithUnboundedStash[T] extends AbstractActor with ActorPublisher[T] with UnboundedStash
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API compatible with lambda expressions.
|
* Java API compatible with lambda expressions.
|
||||||
* This class adds an unrestricted Stash to {@link AbstractActorPublisher}.
|
* This class adds an unrestricted Stash to {@link AbstractActorPublisher}.
|
||||||
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnrestrictedStash]]
|
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnrestrictedStash]]
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
abstract class AbstractActorPublisherWithUnrestrictedStash[T] extends AbstractActor with ActorPublisher[T] with UnrestrictedStash
|
abstract class AbstractActorPublisherWithUnrestrictedStash[T] extends AbstractActor with ActorPublisher[T] with UnrestrictedStash
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,10 @@ abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy {
|
||||||
* together with [[ZeroRequestStrategy]] or some other strategy. In that case
|
* together with [[ZeroRequestStrategy]] or some other strategy. In that case
|
||||||
* you must also call [[#request]] when the actor is started or when it is ready, otherwise
|
* you must also call [[#request]] when the actor is started or when it is ready, otherwise
|
||||||
* it will not receive any elements.
|
* it will not receive any elements.
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
trait ActorSubscriber extends Actor {
|
trait ActorSubscriber extends Actor {
|
||||||
import ActorSubscriber._
|
import ActorSubscriber._
|
||||||
import ActorSubscriberMessage._
|
import ActorSubscriberMessage._
|
||||||
|
|
@ -347,12 +350,18 @@ object UntypedActorSubscriber {
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
* @see [[akka.stream.actor.ActorSubscriber]]
|
* @see [[akka.stream.actor.ActorSubscriber]]
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber
|
abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API compatible with lambda expressions
|
* Java API compatible with lambda expressions
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
object AbstractActorSubscriber {
|
object AbstractActorSubscriber {
|
||||||
/**
|
/**
|
||||||
* Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor
|
* Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor
|
||||||
|
|
@ -365,5 +374,8 @@ object AbstractActorSubscriber {
|
||||||
/**
|
/**
|
||||||
* Java API compatible with lambda expressions
|
* Java API compatible with lambda expressions
|
||||||
* @see [[akka.stream.actor.ActorSubscriber]]
|
* @see [[akka.stream.actor.ActorSubscriber]]
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber
|
abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber
|
||||||
|
|
|
||||||
|
|
@ -213,7 +213,10 @@ object Sink {
|
||||||
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
||||||
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
|
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
|
||||||
* be [[akka.stream.actor.ActorSubscriber]].
|
* be [[akka.stream.actor.ActorSubscriber]].
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
|
def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
|
||||||
new Sink(scaladsl.Sink.actorSubscriber(props))
|
new Sink(scaladsl.Sink.actorSubscriber(props))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -238,7 +238,10 @@ object Source {
|
||||||
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
||||||
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
|
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
|
||||||
* be [[akka.stream.actor.ActorPublisher]].
|
* be [[akka.stream.actor.ActorPublisher]].
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
def actorPublisher[T](props: Props): Source[T, ActorRef] =
|
def actorPublisher[T](props: Props): Source[T, ActorRef] =
|
||||||
new Source(scaladsl.Source.actorPublisher(props))
|
new Source(scaladsl.Source.actorPublisher(props))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -350,7 +350,10 @@ object Sink {
|
||||||
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
||||||
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
|
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
|
||||||
* be [[akka.stream.actor.ActorSubscriber]].
|
* be [[akka.stream.actor.ActorSubscriber]].
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = {
|
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = {
|
||||||
require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber")
|
require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber")
|
||||||
new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
|
new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
|
||||||
|
|
|
||||||
|
|
@ -373,7 +373,10 @@ object Source {
|
||||||
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
|
||||||
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
|
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
|
||||||
* be [[akka.stream.actor.ActorPublisher]].
|
* be [[akka.stream.actor.ActorPublisher]].
|
||||||
|
*
|
||||||
|
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
|
||||||
def actorPublisher[T](props: Props): Source[T, ActorRef] = {
|
def actorPublisher[T](props: Props): Source[T, ActorRef] = {
|
||||||
require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher")
|
require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher")
|
||||||
new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
|
new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue