diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index b3df31b61e..8395189f7a 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -73,6 +73,8 @@ Use @apidoc[AbstractPersistentActorWithAtLeastOnceDelivery] instead. * `CircuitBreaker.onOpen` use `CircuitBreaker.addOnOpenListener` * `CircuitBreaker.onHalfOpen` use `CircuitBreaker.addOnHalfOpenListener` * `CircuitBreaker.onClose` use `CircuitBreaker.addOnCloseListener` +* `Source.actorSubscriber`, use `Source.fromGraph` instead. +* `Source.actorActorPublisher`, use `Source.fromGraph` instead. ### JavaTestKit removed diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index b9d3ccf945..69e364f2a5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -10,7 +10,7 @@ import akka.stream.ActorMaterializerSpec.ActorWithMaterializer import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.scaladsl.{ Sink, Source } import akka.stream.testkit.{ StreamSpec, TestPublisher } -import akka.testkit.{ ImplicitSender, TestActor, TestProbe } +import akka.testkit.{ ImplicitSender, TestProbe } import com.github.ghik.silencer.silent import scala.concurrent.Await @@ -84,14 +84,6 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender { val Failure(_) = p.expectMsgType[Try[Done]] } - "handle properly broken Props" in { - val m = ActorMaterializer.create(system) - an[IllegalArgumentException] should be thrownBy - Await.result( - Source.actorPublisher(Props(classOf[TestActor], "wrong", "arguments")).runWith(Sink.head)(m), - 3.seconds) - } - "report correctly if it has been shut down from the side" in { val sys = ActorSystem() val m = ActorMaterializer.create(sys) diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index a0d1d3010e..e34395a25c 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -112,6 +112,12 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber$ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSink") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.actorPublisher") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Sink.actorSubscriber") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.actorPublisher") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Sink.actorSubscriber") + + # #25045 adding Java/Scala interop to SourceQueue and SinkQueue ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index ad01a49e1a..b57b0d5320 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -7,7 +7,7 @@ package akka.stream.javadsl import java.util.Optional import akka.{ japi, Done, NotUsed } -import akka.actor.{ ActorRef, Props } +import akka.actor.ActorRef import akka.dispatch.ExecutionContexts import akka.japi.function import akka.stream.impl.LinearTraversalBuilder @@ -264,19 +264,6 @@ object Sink { new Sink( scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _)) - /** - * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorSubscriber]]. - * - * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ - @deprecated( - "Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") - def actorSubscriber[T](props: Props): Sink[T, ActorRef] = - new Sink(scaladsl.Sink.actorSubscriber(props)) - /** * A graph with the shape of a sink logically is a sink, this method makes * it so also in type. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 0a9d6a9ecd..d67b8efdbd 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -7,7 +7,7 @@ package akka.stream.javadsl import java.util import java.util.Optional -import akka.actor.{ ActorRef, Cancellable, Props } +import akka.actor.{ ActorRef, Cancellable } import akka.event.LoggingAdapter import akka.japi.{ function, Pair, Util } import akka.stream._ @@ -286,19 +286,6 @@ object Source { def asSubscriber[T](): Source[T, Subscriber[T]] = new Source(scaladsl.Source.asSubscriber) - /** - * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should - * be [[akka.stream.actor.ActorPublisher]]. - * - * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ - @deprecated( - "Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") - def actorPublisher[T](props: Props): Source[T, ActorRef] = - new Source(scaladsl.Source.actorPublisher(props)) - /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Messages sent to this actor will be emitted to the stream if there is demand from downstream, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 500c473e4e..66a2a53673 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -6,9 +6,8 @@ package akka.stream.scaladsl import akka.{ Done, NotUsed } import akka.dispatch.ExecutionContexts -import akka.actor.{ ActorRef, Props, Status } +import akka.actor.{ ActorRef, Status } import akka.annotation.InternalApi -import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages @@ -17,7 +16,6 @@ import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec -import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } import scala.collection.immutable @@ -518,21 +516,6 @@ object Sink { onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] = actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) - /** - * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must - * be [[akka.stream.actor.ActorSubscriber]]. - * - * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ - @deprecated( - "Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") - def actorSubscriber[T](props: Props): Sink[T, ActorRef] = { - require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber") - fromGraph(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink"))) - } - /** * Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]]. * [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5b08b3ed1a..1c23f0d05a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -6,9 +6,8 @@ package akka.stream.scaladsl import java.util.concurrent.CompletionStage -import akka.actor.{ ActorRef, Cancellable, Props } +import akka.actor.{ ActorRef, Cancellable } import akka.annotation.InternalApi -import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages._ @@ -457,21 +456,6 @@ object Source { def asSubscriber[T]: Source[T, Subscriber[T]] = fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource"))) - /** - * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must - * be [[akka.stream.actor.ActorPublisher]]. - * - * @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant. - */ - @deprecated( - "Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", - since = "2.5.0") - def actorPublisher[T](props: Props): Source[T, ActorRef] = { - require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher") - fromGraph(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource"))) - } - /** * INTERNAL API *