diff --git a/akka-docs/rst/java/stream/stream-integrations.rst b/akka-docs/rst/java/stream/stream-integrations.rst index aef739e108..d23e616e91 100644 --- a/akka-docs/rst/java/stream/stream-integrations.rst +++ b/akka-docs/rst/java/stream/stream-integrations.rst @@ -437,6 +437,14 @@ These can be consumed by other Reactive Stream libraries or used as an Akka Stre ActorPublisher -------------- +.. warning:: + **Deprecation warning:** ``ActorPublisher`` is deprecated in favour of the vastly more + type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also + expose a "stage actor ref" is needed to be addressed as-if an Actor. + Custom stages implemented using ``GraphStage`` are also automatically fusable. + + To learn more about implementing custom stages using it refer to :ref:`graphstage-java`. + Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a stream publisher that keeps track of the subscription life cycle and requested elements. @@ -482,6 +490,14 @@ attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscr ActorSubscriber --------------- +.. warning:: + **Deprecation warning:** ``ActorSubscriber`` is deprecated in favour of the vastly more + type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also + expose a "stage actor ref" is needed to be addressed as-if an Actor. + Custom stages implemented using ``GraphStage`` are also automatically fusable. + + To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`. + Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with full control of stream back pressure. It will receive ``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index cf74da0544..1eeadafd5f 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -34,6 +34,23 @@ would now be:: as the ``GraphStage`` itself is a factory of logic instances. +Deprecation of ActorSubscriber and ActorPublisher +------------------------------------------------- + +The classes ``ActorPublisher`` and ``ActorSubscriber`` were the first user-facing Reactive Streams integration +API that we provided for end-users. Akka Streams APIs have evolved and improved a lot since then, and now +there is no need to use these low-level abstractions anymore. It is easy to get things wrong when implementing them, +and one would have to validate each implementation of such Actor using the Reactive Streams Technology Compatibility Kit. + +The replacement API is the powerful ``GraphStage``. It has all features that raw Actors provided for implementing Stream +stages and adds additional protocol and type-safety. You can learn all about it in the documentation: +:ref:`stream-customize-scala`and :ref:`Custom stream processing in JavaDSL `. + +You should also read the blog post series on the official team blog, starting with `Mastering GraphStages, part I`_, +which explains using and implementing GraphStages in more practical terms than the reference documentation. + +.. _Mastering GraphStages, part I: http://blog.akka.io/streams/2016/07/30/mastering-graph-stage-part-1 + Agents ====== diff --git a/akka-docs/rst/scala/stream/stream-integrations.rst b/akka-docs/rst/scala/stream/stream-integrations.rst index 33e8658d48..5aafbeb201 100644 --- a/akka-docs/rst/scala/stream/stream-integrations.rst +++ b/akka-docs/rst/scala/stream/stream-integrations.rst @@ -437,6 +437,14 @@ These can be consumed by other Reactive Stream libraries or used as an Akka Stre ActorPublisher -------------- +.. warning:: + **Deprecation warning:** ``ActorPublisher`` is deprecated in favour of the vastly more + type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also + expose a "stage actor ref" is needed to be addressed as-if an Actor. + Custom stages implemented using ``GraphStage`` are also automatically fusable. + + To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`. + Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a stream publisher that keeps track of the subscription life cycle and requested elements. @@ -482,6 +490,14 @@ subscription attempts will be rejected with an :class:`IllegalStateException`. ActorSubscriber --------------- +.. warning:: + **Deprecation warning:** ``ActorSubscriber`` is deprecated in favour of the vastly more + type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also + expose a "stage actor ref" is needed to be addressed as-if an Actor. + Custom stages implemented using ``GraphStage`` are also automatically fusable. + + To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`. + Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a stream subscriber with full control of stream back pressure. It will receive ``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index cf648a3be5..bc64f12e7d 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -12,6 +12,7 @@ import concurrent.duration.FiniteDuration import akka.stream.impl.CancelledSubscription import akka.stream.impl.ReactiveStreamsCompliance._ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") object ActorPublisher { /** @@ -120,7 +121,10 @@ object ActorPublisherMessage { * * If the actor is stopped the stream will be completed, unless it was not already terminated with * failure, completed or canceled. + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") trait ActorPublisher[T] extends Actor { import ActorPublisher.Internal._ import ActorPublisherMessage._ @@ -450,6 +454,7 @@ object UntypedActorPublisher { * Java API * @see [[akka.stream.actor.ActorPublisher]] */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T] /** @@ -467,26 +472,38 @@ object AbstractActorPublisher { /** * Java API compatible with lambda expressions * @see [[akka.stream.actor.ActorPublisher]] + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T] /** * Java API compatible with lambda expressions. * This class adds a Stash to {@link AbstractActorPublisher}. * @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithStash]] + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") abstract class AbstractActorPublisherWithStash[T] extends AbstractActor with ActorPublisher[T] with Stash /** * Java API compatible with lambda expressions. * This class adds an unbounded Stash to {@link AbstractActorPublisher}. * @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnboundedStash]] + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") abstract class AbstractActorPublisherWithUnboundedStash[T] extends AbstractActor with ActorPublisher[T] with UnboundedStash /** * Java API compatible with lambda expressions. * This class adds an unrestricted Stash to {@link AbstractActorPublisher}. * @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnrestrictedStash]] + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") abstract class AbstractActorPublisherWithUnrestrictedStash[T] extends AbstractActor with ActorPublisher[T] with UnrestrictedStash diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index 3d28018e3f..777c5a6909 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -158,7 +158,10 @@ abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy { * together with [[ZeroRequestStrategy]] or some other strategy. In that case * you must also call [[#request]] when the actor is started or when it is ready, otherwise * it will not receive any elements. + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") trait ActorSubscriber extends Actor { import ActorSubscriber._ import ActorSubscriberMessage._ @@ -347,12 +350,18 @@ object UntypedActorSubscriber { /** * Java API * @see [[akka.stream.actor.ActorSubscriber]] + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber /** * Java API compatible with lambda expressions + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") object AbstractActorSubscriber { /** * Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor @@ -365,5 +374,8 @@ object AbstractActorSubscriber { /** * Java API compatible with lambda expressions * @see [[akka.stream.actor.ActorSubscriber]] + * + * @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ +@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index b9ad9d4e2a..930eb94d21 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -213,7 +213,10 @@ object Sink { * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should * be [[akka.stream.actor.ActorSubscriber]]. + * + * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ + @deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") def actorSubscriber[T](props: Props): Sink[T, ActorRef] = new Sink(scaladsl.Sink.actorSubscriber(props)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index f9510c1706..6340884323 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -238,7 +238,10 @@ object Source { * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should * be [[akka.stream.actor.ActorPublisher]]. + * + * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ + @deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") def actorPublisher[T](props: Props): Source[T, ActorRef] = new Source(scaladsl.Source.actorPublisher(props)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 895d48acc1..12e191f7d0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -350,7 +350,10 @@ object Sink { * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must * be [[akka.stream.actor.ActorSubscriber]]. + * + * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ + @deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") def actorSubscriber[T](props: Props): Sink[T, ActorRef] = { require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber") new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink"))) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 34e09ec9f3..c3b6d94d26 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -373,7 +373,10 @@ object Source { * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must * be [[akka.stream.actor.ActorPublisher]]. + * + * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. */ + @deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0") def actorPublisher[T](props: Props): Source[T, ActorRef] = { require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher") new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))