Remove source/sink fromPublisher/fromSubscriber (#27288)

* Remove source/sink fromPublisher/fromSubscriber

Refs #26187

* mima
This commit is contained in:
Christopher Batey 2019-07-25 12:16:30 +01:00 committed by Arnout Engelen
parent ba2384893f
commit 09838a71e5
7 changed files with 13 additions and 72 deletions

View file

@ -73,6 +73,8 @@ Use @apidoc[AbstractPersistentActorWithAtLeastOnceDelivery] instead.
* `CircuitBreaker.onOpen` use `CircuitBreaker.addOnOpenListener` * `CircuitBreaker.onOpen` use `CircuitBreaker.addOnOpenListener`
* `CircuitBreaker.onHalfOpen` use `CircuitBreaker.addOnHalfOpenListener` * `CircuitBreaker.onHalfOpen` use `CircuitBreaker.addOnHalfOpenListener`
* `CircuitBreaker.onClose` use `CircuitBreaker.addOnCloseListener` * `CircuitBreaker.onClose` use `CircuitBreaker.addOnCloseListener`
* `Source.actorSubscriber`, use `Source.fromGraph` instead.
* `Source.actorActorPublisher`, use `Source.fromGraph` instead.
### JavaTestKit removed ### JavaTestKit removed

View file

@ -10,7 +10,7 @@ import akka.stream.ActorMaterializerSpec.ActorWithMaterializer
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.scaladsl.{ Sink, Source } import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.testkit.{ ImplicitSender, TestActor, TestProbe } import akka.testkit.{ ImplicitSender, TestProbe }
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
import scala.concurrent.Await import scala.concurrent.Await
@ -84,14 +84,6 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
val Failure(_) = p.expectMsgType[Try[Done]] val Failure(_) = p.expectMsgType[Try[Done]]
} }
"handle properly broken Props" in {
val m = ActorMaterializer.create(system)
an[IllegalArgumentException] should be thrownBy
Await.result(
Source.actorPublisher(Props(classOf[TestActor], "wrong", "arguments")).runWith(Sink.head)(m),
3.seconds)
}
"report correctly if it has been shut down from the side" in { "report correctly if it has been shut down from the side" in {
val sys = ActorSystem() val sys = ActorSystem()
val m = ActorMaterializer.create(sys) val m = ActorMaterializer.create(sys)

View file

@ -112,6 +112,12 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber$
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSink") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSink")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.actorPublisher")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Sink.actorSubscriber")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Source.actorPublisher")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Sink.actorSubscriber")
# #25045 adding Java/Scala interop to SourceQueue and SinkQueue # #25045 adding Java/Scala interop to SourceQueue and SinkQueue
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter")

View file

@ -7,7 +7,7 @@ package akka.stream.javadsl
import java.util.Optional import java.util.Optional
import akka.{ japi, Done, NotUsed } import akka.{ japi, Done, NotUsed }
import akka.actor.{ ActorRef, Props } import akka.actor.ActorRef
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.japi.function import akka.japi.function
import akka.stream.impl.LinearTraversalBuilder import akka.stream.impl.LinearTraversalBuilder
@ -264,19 +264,6 @@ object Sink {
new Sink( new Sink(
scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _)) scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply _))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
new Sink(scaladsl.Sink.actorSubscriber(props))
/** /**
* A graph with the shape of a sink logically is a sink, this method makes * A graph with the shape of a sink logically is a sink, this method makes
* it so also in type. * it so also in type.

View file

@ -7,7 +7,7 @@ package akka.stream.javadsl
import java.util import java.util
import java.util.Optional import java.util.Optional
import akka.actor.{ ActorRef, Cancellable, Props } import akka.actor.{ ActorRef, Cancellable }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.japi.{ function, Pair, Util } import akka.japi.{ function, Pair, Util }
import akka.stream._ import akka.stream._
@ -286,19 +286,6 @@ object Source {
def asSubscriber[T](): Source[T, Subscriber[T]] = def asSubscriber[T](): Source[T, Subscriber[T]] =
new Source(scaladsl.Source.asSubscriber) new Source(scaladsl.Source.asSubscriber)
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
def actorPublisher[T](props: Props): Source[T, ActorRef] =
new Source(scaladsl.Source.actorPublisher(props))
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
* Messages sent to this actor will be emitted to the stream if there is demand from downstream, * Messages sent to this actor will be emitted to the stream if there is demand from downstream,

View file

@ -6,9 +6,8 @@ package akka.stream.scaladsl
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.actor.{ ActorRef, Props, Status } import akka.actor.{ ActorRef, Status }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages
@ -17,7 +16,6 @@ import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import scala.collection.immutable import scala.collection.immutable
@ -518,21 +516,6 @@ object Sink {
onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] = onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage) actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorSubscriber]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = {
require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber")
fromGraph(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
}
/** /**
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]]. * Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]].
* [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. * [[akka.stream.scaladsl.SinkQueueWithCancel.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.

View file

@ -6,9 +6,8 @@ package akka.stream.scaladsl
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import akka.actor.{ ActorRef, Cancellable, Props } import akka.actor.{ ActorRef, Cancellable }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisher
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages._ import akka.stream.impl.fusing.GraphStages._
@ -457,21 +456,6 @@ object Source {
def asSubscriber[T]: Source[T, Subscriber[T]] = def asSubscriber[T]: Source[T, Subscriber[T]] =
fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource"))) fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource")))
/**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorPublisher]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated(
"Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.",
since = "2.5.0")
def actorPublisher[T](props: Props): Source[T, ActorRef] = {
require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher")
fromGraph(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
}
/** /**
* INTERNAL API * INTERNAL API
* *