=str #16957 Refactor actor creation api for sinks and sources

* also solves #16952, dispatcher for ActorPublisher
This commit is contained in:
Patrik Nordwall 2015-04-10 14:39:48 +02:00
parent 386ff80a0e
commit 4fcd3d0392
7 changed files with 149 additions and 67 deletions

View file

@ -4,22 +4,22 @@
package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ ActorRef, Props }
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.OperationAttributes
import akka.stream.{ Inlet, Shape, SinkShape }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{ Future, Promise }
import akka.stream.MaterializationContext
import akka.stream.ActorFlowMaterializer
/**
* INTERNAL API
*/
private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module {
def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In] @uncheckedVariance, Mat)
def create(context: MaterializationContext): (Subscriber[In] @uncheckedVariance, Mat)
override def replaceShape(s: Shape): Module =
if (s == shape) this
@ -55,7 +55,7 @@ private[akka] class PublisherSink[In](val attributes: OperationAttributes, shape
override def toString: String = "PublisherSink"
override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In], Publisher[In]) = {
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
val pub = new VirtualPublisher[In]
val sub = new VirtualSubscriber[In](pub)
(sub, pub)
@ -75,9 +75,11 @@ private[akka] final class FanoutPublisherSink[In](
shape: SinkShape[In])
extends SinkModule[In, Publisher[In]](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In], Publisher[In]) = {
val fanoutActor = materializer.actorOf(
Props(new FanoutProcessorImpl(materializer.settings, initialBufferSize, maximumBufferSize)), s"$flowName-fanoutPublisher")
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer)
val fanoutActor = actorMaterializer.actorOf(context,
Props(new FanoutProcessorImpl(actorMaterializer.effectiveSettings(context.effectiveAttributes),
initialBufferSize, maximumBufferSize)))
val fanoutProcessor = ActorProcessorFactory[In, In](fanoutActor)
(fanoutProcessor, fanoutProcessor)
}
@ -127,7 +129,7 @@ private[akka] object HeadSink {
*/
private[akka] class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
override def create(context: MaterializationContext) = {
val p = Promise[In]()
val sub = new HeadSink.HeadSinkSubscriber[In](p)
(sub, p.future)
@ -146,8 +148,11 @@ private[akka] class HeadSink[In](val attributes: OperationAttributes, shape: Sin
*/
private[akka] final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) =
(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize), ())
override def create(context: MaterializationContext) = {
val effectiveSettings = ActorFlowMaterializer.downcast(context.materializer)
.effectiveSettings(context.effectiveAttributes)
(new BlackholeSubscriber[Any](effectiveSettings.maxInputBufferSize), ())
}
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new BlackholeSink(attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new BlackholeSink(attr, amendShape(attr))
@ -159,7 +164,7 @@ private[akka] final class BlackholeSink(val attributes: OperationAttributes, sha
*/
private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (subscriber, ())
override def create(context: MaterializationContext) = (subscriber, ())
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = new SubscriberSink[In](subscriber, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr))
@ -171,7 +176,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att
*/
private[akka] final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[Any], Unit) = {
override def create(context: MaterializationContext): (Subscriber[Any], Unit) = {
val subscriber = new Subscriber[Any] {
override def onError(t: Throwable): Unit = ()
override def onSubscribe(s: Subscription): Unit = s.cancel()
@ -192,8 +197,8 @@ private[akka] final class CancelSink(val attributes: OperationAttributes, shape:
*/
private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val subscriberRef = materializer.actorOf(props, name = s"$flowName-actorSubscriber")
override def create(context: MaterializationContext) = {
val subscriberRef = ActorFlowMaterializer.downcast(context.materializer).actorOf(context, props)
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
}
@ -208,10 +213,11 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
val attributes: OperationAttributes,
shape: SinkShape[In]) extends SinkModule[In, Unit](shape) {
override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = {
val subscriberRef = materializer.actorOf(
ActorRefSinkActor.props(ref, materializer.settings.maxInputBufferSize, onCompleteMessage),
name = s"$flowName-actorRef")
override def create(context: MaterializationContext) = {
val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer)
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
val subscriberRef = actorMaterializer.actorOf(context,
ActorRefSinkActor.props(ref, effectiveSettings.maxInputBufferSize, onCompleteMessage))
(akka.stream.actor.ActorSubscriber[In](subscriberRef), ())
}