Final removal of ActorPublisher and ActorSubscriber, #26187

* some messages from ActorSubscriber were also used in other places,
  so moved those to impl.ActorSubscriberMessage
* WatermarkRequestStrategy used by SourceRefImp, so moved there
This commit is contained in:
Patrik Nordwall 2019-09-13 15:19:20 +02:00
parent a84aa0095c
commit db132cd216
15 changed files with 435 additions and 861 deletions

View file

@ -7,8 +7,6 @@ package akka.stream.impl
import java.util.function.BinaryOperator
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.Props
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
@ -165,28 +163,6 @@ import scala.util.control.NonFatal
override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr))
}
/**
* INTERNAL API
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
@InternalApi private[akka] final class ActorSubscriberSink[In](
props: Props,
val attributes: Attributes,
shape: SinkShape[In])
extends SinkModule[In, ActorRef](shape) {
override def create(context: MaterializationContext) = {
val subscriberRef = context.materializer.actorOf(context, props)
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] =
new ActorSubscriberSink[In](props, attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, ActorRef] =
new ActorSubscriberSink[In](props, attr, amendShape(attr))
}
/**
* INTERNAL API
*/