diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 0369327bb3..620fb828c8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -14,21 +14,32 @@ import akka.testkit.EventFilter import akka.testkit.ImplicitSender import akka.testkit.TestEvent.Mute import akka.testkit.TestProbe - import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.stream.impl.SubscriberSink +import akka.stream.ActorFlowMaterializerSettings object ActorPublisherSpec { - def testPublisherProps(probe: ActorRef): Props = - Props(new TestPublisher(probe)).withDispatcher("akka.test.stream-dispatcher") + val config = + s""" + my-dispatcher1 = $${akka.test.stream-dispatcher} + my-dispatcher2 = $${akka.test.stream-dispatcher} + """ + + def testPublisherProps(probe: ActorRef, useTestDispatcher: Boolean = true): Props = { + val p = Props(new TestPublisher(probe)) + if (useTestDispatcher) p.withDispatcher("akka.test.stream-dispatcher") + else p + } case class TotalDemand(elements: Long) case class Produce(elem: String) case class Err(reason: String) case object Boom case object Complete + case object ThreadName class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { import akka.stream.actor.ActorPublisherMessage._ @@ -39,6 +50,7 @@ object ActorPublisherSpec { case Err(reason) ⇒ onError(new RuntimeException(reason) with NoStackTrace) case Complete ⇒ onComplete() case Boom ⇒ throw new RuntimeException("boom") with NoStackTrace + case ThreadName ⇒ probe ! Thread.currentThread.getName } } @@ -114,7 +126,7 @@ object ActorPublisherSpec { } -class ActorPublisherSpec extends AkkaSpec with ImplicitSender { +class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with ImplicitSender { import akka.stream.actor.ActorPublisherSpec._ @@ -349,6 +361,35 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { } } + "use dispatcher from materializer settings" in { + implicit val materializer = ActorFlowMaterializer( + ActorFlowMaterializerSettings(system).withDispatcher("my-dispatcher1")) + val s = StreamTestKit.SubscriberProbe[String]() + val ref = Source.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false)).to(Sink(s)).run() + ref ! ThreadName + expectMsgType[String] should include("my-dispatcher1") + } + + "use dispatcher from operation attributes" in { + implicit val materializer = ActorFlowMaterializer() + val s = StreamTestKit.SubscriberProbe[String]() + val ref = Source.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false)) + .withAttributes(OperationAttributes.dispatcher("my-dispatcher1")) + .to(Sink(s)).run() + ref ! ThreadName + expectMsgType[String] should include("my-dispatcher1") + } + + "use dispatcher from props" in { + implicit val materializer = ActorFlowMaterializer() + val s = StreamTestKit.SubscriberProbe[String]() + val ref = Source.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false).withDispatcher("my-dispatcher1")) + .withAttributes(OperationAttributes.dispatcher("my-dispatcher2")) + .to(Sink(s)).run() + ref ! ThreadName + expectMsgType[String] should include("my-dispatcher1") + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index a71697d4d6..32321eb130 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -14,6 +14,7 @@ import akka.actor.Props import akka.actor.ActorRef import akka.stream.javadsl.japi import scala.concurrent.ExecutionContextExecutor +import akka.stream.scaladsl.OperationAttributes object ActorFlowMaterializer { @@ -121,6 +122,16 @@ object ActorFlowMaterializer { system } + /** + * INTERNAL API + */ + private[akka] def downcast(materializer: FlowMaterializer): ActorFlowMaterializer = + materializer match { + case m: ActorFlowMaterializer ⇒ m + case _ ⇒ throw new IllegalArgumentException(s"required [${classOf[ActorFlowMaterializer].getName}] " + + s"but got [${materializer.getClass.getName}]") + } + } /** @@ -134,10 +145,12 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { def settings: ActorFlowMaterializerSettings + def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings + /** - * INTERNAL API + * INTERNAL API: this might become public later */ - private[akka] def actorOf(props: Props, name: String): ActorRef + private[akka] def actorOf(context: MaterializationContext, props: Props): ActorRef } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 1183a05402..238c4ac4e1 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -5,6 +5,7 @@ package akka.stream import scala.concurrent.ExecutionContextExecutor import akka.japi +import akka.stream.scaladsl.OperationAttributes abstract class FlowMaterializer { @@ -44,3 +45,13 @@ private[akka] object NoFlowMaterializer extends FlowMaterializer { override def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException("NoFlowMaterializer does not provide an ExecutionContext") } + +/** + * INTERNAL API: this might become public later + * + * Context parameter to the `create` methods of sources and sinks. + */ +private[akka] case class MaterializationContext( + materializer: FlowMaterializer, + effectiveAttributes: OperationAttributes, + stageName: String) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 2be22b244e..c2c718d3be 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -19,17 +19,6 @@ import org.reactivestreams._ import scala.concurrent.{ Await, ExecutionContextExecutor } -object ActorFlowMaterializerImpl { - import OperationAttributes._ - private[akka] def calcSettings(opAttr: OperationAttributes)(settings: ActorFlowMaterializerSettings): ActorFlowMaterializerSettings = - opAttr.attributes.collect { - case InputBuffer(initial, max) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withInputBuffer(initial, max) - case Dispatcher(dispatcher) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withDispatcher(dispatcher) - case SupervisionStrategy(decider) ⇒ (s: ActorFlowMaterializerSettings) ⇒ - s.withSupervisionStrategy(decider) - }.reduceOption(_ andThen _).getOrElse((x: ActorFlowMaterializerSettings) ⇒ x)(settings) // FIXME is this the optimal way of encoding this? -} - /** * INTERNAL API */ @@ -43,12 +32,24 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF import ActorFlowMaterializerImpl._ import akka.stream.impl.Stages._ - def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) + override def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" + override def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings = { + import OperationAttributes._ + opAttr.attributes.foldLeft(settings) { (s, attr) ⇒ + attr match { + case InputBuffer(initial, max) ⇒ s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) ⇒ s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) ⇒ s.withSupervisionStrategy(decider) + case Name(_) ⇒ s + } + } + } + override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = { runnableFlow.module.validate() @@ -63,23 +64,26 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF override protected def materializeAtomic(atomic: Module, effectiveAttributes: OperationAttributes): Any = { + def newMaterializationContext() = new MaterializationContext(ActorFlowMaterializerImpl.this, + effectiveAttributes, stageName(effectiveAttributes)) + atomic match { case sink: SinkModule[_, _] ⇒ - val (sub, mat) = sink.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes)) + val (sub, mat) = sink.create(newMaterializationContext()) assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]]) mat case source: SourceModule[_, _] ⇒ - val (pub, mat) = source.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes)) + val (pub, mat) = source.create(newMaterializationContext()) assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]]) mat case stage: StageModule ⇒ - val (processor, mat) = processorFor(stage, effectiveAttributes, calcSettings(effectiveAttributes)(settings)) + val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) assignPort(stage.inPort, processor) assignPort(stage.outPort, processor) mat - case junction: JunctionModule ⇒ materializeJunction(junction, effectiveAttributes, calcSettings(effectiveAttributes)(settings)) + case junction: JunctionModule ⇒ materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes)) } } @@ -166,13 +170,17 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF session.materialize().asInstanceOf[Mat] } - lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { + override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { case Deploy.NoDispatcherGiven ⇒ Dispatchers.DefaultDispatcherId case other ⇒ other }) - private[akka] def actorOf(props: Props, name: String): ActorRef = - actorOf(props, name, settings.dispatcher) + override def actorOf(context: MaterializationContext, props: Props): ActorRef = { + val dispatcher = + if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher + else props.dispatcher + actorOf(props, context.stageName, dispatcher) + } private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = supervisor match { case ref: LocalActorRef ⇒ @@ -237,11 +245,11 @@ private[akka] object ActorProcessorFactory { private val _identity = (x: Any) ⇒ x - def props(materializer: ActorFlowMaterializerImpl, op: StageModule, parentAttributes: OperationAttributes): (Props, Any) = { + def props(materializer: ActorFlowMaterializer, op: StageModule, parentAttributes: OperationAttributes): (Props, Any) = { val att = parentAttributes and op.attributes // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW // Also, otherwise the attributes will not affect the settings properly! - val settings = calcSettings(att)(materializer.settings) + val settings = materializer.effectiveSettings(att) op match { case Identity(_) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(_identity, settings.supervisionDecider)), materializer), ()) case Fused(ops, _) ⇒ (ActorInterpreter.props(settings, ops, materializer), ()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 161ccb7214..9e89728751 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -4,22 +4,22 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference - import akka.actor.{ ActorRef, Props } import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.OperationAttributes import akka.stream.{ Inlet, Shape, SinkShape } import org.reactivestreams.{ Publisher, Subscriber, Subscription } - import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ Future, Promise } +import akka.stream.MaterializationContext +import akka.stream.ActorFlowMaterializer /** * INTERNAL API */ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module { - def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In] @uncheckedVariance, Mat) + def create(context: MaterializationContext): (Subscriber[In] @uncheckedVariance, Mat) override def replaceShape(s: Shape): Module = if (s == shape) this @@ -55,7 +55,7 @@ private[akka] class PublisherSink[In](val attributes: OperationAttributes, shape override def toString: String = "PublisherSink" - override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In], Publisher[In]) = { + override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { val pub = new VirtualPublisher[In] val sub = new VirtualSubscriber[In](pub) (sub, pub) @@ -75,9 +75,11 @@ private[akka] final class FanoutPublisherSink[In]( shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[In], Publisher[In]) = { - val fanoutActor = materializer.actorOf( - Props(new FanoutProcessorImpl(materializer.settings, initialBufferSize, maximumBufferSize)), s"$flowName-fanoutPublisher") + override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { + val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer) + val fanoutActor = actorMaterializer.actorOf(context, + Props(new FanoutProcessorImpl(actorMaterializer.effectiveSettings(context.effectiveAttributes), + initialBufferSize, maximumBufferSize))) val fanoutProcessor = ActorProcessorFactory[In, In](fanoutActor) (fanoutProcessor, fanoutProcessor) } @@ -127,7 +129,7 @@ private[akka] object HeadSink { */ private[akka] class HeadSink[In](val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { + override def create(context: MaterializationContext) = { val p = Promise[In]() val sub = new HeadSink.HeadSinkSubscriber[In](p) (sub, p.future) @@ -146,8 +148,11 @@ private[akka] class HeadSink[In](val attributes: OperationAttributes, shape: Sin */ private[akka] final class BlackholeSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = - (new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize), ()) + override def create(context: MaterializationContext) = { + val effectiveSettings = ActorFlowMaterializer.downcast(context.materializer) + .effectiveSettings(context.effectiveAttributes) + (new BlackholeSubscriber[Any](effectiveSettings.maxInputBufferSize), ()) + } override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Unit] = new BlackholeSink(attributes, shape) override def withAttributes(attr: OperationAttributes): Module = new BlackholeSink(attr, amendShape(attr)) @@ -159,7 +164,7 @@ private[akka] final class BlackholeSink(val attributes: OperationAttributes, sha */ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (subscriber, ()) + override def create(context: MaterializationContext) = (subscriber, ()) override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Unit] = new SubscriberSink[In](subscriber, attributes, shape) override def withAttributes(attr: OperationAttributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr)) @@ -171,7 +176,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att */ private[akka] final class CancelSink(val attributes: OperationAttributes, shape: SinkShape[Any]) extends SinkModule[Any, Unit](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Subscriber[Any], Unit) = { + override def create(context: MaterializationContext): (Subscriber[Any], Unit) = { val subscriber = new Subscriber[Any] { override def onError(t: Throwable): Unit = () override def onSubscribe(s: Subscription): Unit = s.cancel() @@ -192,8 +197,8 @@ private[akka] final class CancelSink(val attributes: OperationAttributes, shape: */ private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { - val subscriberRef = materializer.actorOf(props, name = s"$flowName-actorSubscriber") + override def create(context: MaterializationContext) = { + val subscriberRef = ActorFlowMaterializer.downcast(context.materializer).actorOf(context, props) (akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef) } @@ -208,10 +213,11 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any val attributes: OperationAttributes, shape: SinkShape[In]) extends SinkModule[In, Unit](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { - val subscriberRef = materializer.actorOf( - ActorRefSinkActor.props(ref, materializer.settings.maxInputBufferSize, onCompleteMessage), - name = s"$flowName-actorRef") + override def create(context: MaterializationContext) = { + val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer) + val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) + val subscriberRef = actorMaterializer.actorOf(context, + ActorRefSinkActor.props(ref, effectiveSettings.maxInputBufferSize, onCompleteMessage)) (akka.stream.actor.ActorSubscriber[In](subscriberRef), ()) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 201294f3d6..fef1eed7c7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -4,24 +4,24 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean - import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props } import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.OperationAttributes import akka.stream.{ Outlet, OverflowStrategy, Shape, SourceShape } import org.reactivestreams._ - import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success } +import akka.stream.MaterializationContext +import akka.stream.ActorFlowMaterializer /** * INTERNAL API */ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module { - def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Publisher[Out] @uncheckedVariance, Mat) + def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat) override def replaceShape(s: Shape): Module = if (s == shape) this @@ -54,7 +54,7 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out */ private[akka] final class SubscriberSource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Publisher[Out], Subscriber[Out]) = { + override def create(context: MaterializationContext): (Publisher[Out], Subscriber[Out]) = { val processor = new Processor[Out, Out] { @volatile private var subscriber: Subscriber[_ >: Out] = null @@ -81,7 +81,7 @@ private[akka] final class SubscriberSource[Out](val attributes: OperationAttribu * back-pressure upstream. */ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = (p, ()) + override def create(context: MaterializationContext) = (p, ()) override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new PublisherSource[Out](p, attributes, shape) override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, amendShape(attr)) @@ -95,15 +95,17 @@ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes * The stream terminates with an error if the `Future` is completed with a failure. */ private[akka] final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = + override def create(context: MaterializationContext) = future.value match { case Some(Success(element)) ⇒ - (SynchronousIterablePublisher(List(element), s"$flowName-0-synciterable"), ()) // Option is not Iterable. sigh + (SynchronousIterablePublisher(List(element), context.stageName), ()) // Option is not Iterable. sigh case Some(Failure(t)) ⇒ - (ErrorPublisher(t, s"$flowName-0-error").asInstanceOf[Publisher[Out]], ()) + (ErrorPublisher(t, context.stageName).asInstanceOf[Publisher[Out]], ()) case None ⇒ - (ActorPublisher[Out](materializer.actorOf(FuturePublisher.props(future, materializer.settings), - name = s"$flowName-0-future")), ()) // FIXME this does not need to be an actor + val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer) + val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) + (ActorPublisher[Out](actorMaterializer.actorOf(context, + FuturePublisher.props(future, effectiveSettings))), ()) // FIXME this does not need to be an actor } override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new FutureSource(future, attributes, shape) @@ -116,7 +118,7 @@ private[akka] final class FutureSource[Out](future: Future[Out], val attributes: private[akka] final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Unit]](shape) { import ReactiveStreamsCompliance._ - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { + override def create(context: MaterializationContext) = { val p = Promise[Unit]() val pub = new Publisher[Unit] { @@ -129,7 +131,7 @@ private[akka] final class LazyEmptySource[Out](val attributes: OperationAttribut p.future.onComplete { case Success(_) ⇒ tryOnComplete(s) case Failure(ex) ⇒ tryOnError(s, ex) // due to external signal - }(materializer.executionContext) + }(context.materializer.executionContext) } } @@ -150,11 +152,12 @@ private[akka] final class LazyEmptySource[Out](val attributes: OperationAttribut */ private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { + override def create(context: MaterializationContext) = { val cancelled = new AtomicBoolean(false) - val ref = - materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings, cancelled), - name = s"$flowName-0-tick") + val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer) + val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) + val ref = actorMaterializer.actorOf(context, + TickPublisher.props(initialDelay, interval, tick, effectiveSettings, cancelled)) (ActorPublisher[Out](ref), new Cancellable { override def cancel(): Boolean = { if (!isCancelled) ref ! PoisonPill @@ -175,8 +178,8 @@ private[akka] final class TickSource[Out](initialDelay: FiniteDuration, interval */ private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { - val publisherRef = materializer.actorOf(props, name = s"$flowName-0-actorPublisher") + override def create(context: MaterializationContext) = { + val publisherRef = ActorFlowMaterializer.downcast(context.materializer).actorOf(context, props) (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef) } @@ -191,9 +194,9 @@ private[akka] final class ActorRefSource[Out]( bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { - val ref = materializer.actorOf(ActorRefSourceActor.props(bufferSize, overflowStrategy), - name = s"$flowName-0-actorRef") + override def create(context: MaterializationContext) = { + val ref = ActorFlowMaterializer.downcast(context.materializer).actorOf(context, + ActorRefSourceActor.props(bufferSize, overflowStrategy)) (akka.stream.actor.ActorPublisher[Out](ref), ref) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala index 864e510754..c8782246f4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala @@ -87,7 +87,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { val attributes: OperationAttributes, _shape: SourceShape[IncomingConnection]) extends SourceModule[IncomingConnection, Future[ServerBinding]](_shape) { - override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Publisher[IncomingConnection], Future[ServerBinding]) = { + override def create(context: MaterializationContext): (Publisher[IncomingConnection], Future[ServerBinding]) = { val localAddressPromise = Promise[InetSocketAddress]() val unbindPromise = Promise[() ⇒ Future[Unit]]() val publisher = new Publisher[IncomingConnection] {