Rewrite actor ref sink as a graph stage (#27267)

This commit is contained in:
Christopher Batey 2019-07-05 12:20:36 +01:00 committed by Patrik Nordwall
parent b006567751
commit c564a0ff54
6 changed files with 88 additions and 91 deletions

View file

@ -184,32 +184,6 @@ import org.reactivestreams.Subscriber
new ActorSubscriberSink[In](props, attr, amendShape(attr))
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class ActorRefSink[In](
ref: ActorRef,
onCompleteMessage: Any,
onFailureMessage: Throwable => Any,
val attributes: Attributes,
shape: SinkShape[In])
extends SinkModule[In, NotUsed](shape) {
override def create(context: MaterializationContext) = {
val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
val subscriberRef = actorMaterializer.actorOf(
context,
ActorRefSinkActor.props(ref, maxInputBufferSize, onCompleteMessage, onFailureMessage))
(akka.stream.actor.ActorSubscriber[In](subscriberRef), NotUsed)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] =
new ActorRefSink[In](ref, onCompleteMessage, onFailureMessage, attr, amendShape(attr))
}
/**
* INTERNAL API
*/