diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index dfad132580..a12948ae57 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -13,7 +13,7 @@ import akka.stream._ import akka.stream.impl.fusing.GraphInterpreterShell import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{Await, ExecutionContextExecutor} +import scala.concurrent.{ Await, ExecutionContextExecutor } /** * ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 2e9d64785e..ff74cdbbc8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -24,6 +24,9 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.collection.immutable.Map import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContextExecutor +import scala.annotation.tailrec +import akka.stream.impl.fusing.GraphInterpreter.DownstreamBoundaryStageLogic +import akka.stream.impl.fusing.GraphInterpreter.UpstreamBoundaryStageLogic object PhasedFusingActorMaterializer { @@ -656,11 +659,23 @@ final class GraphStageIsland( case _ ⇒ val props = ActorGraphInterpreter.props(shell) .withDispatcher(effectiveSettings.dispatcher) - - materializer.actorOf(props, islandName) + materializer.actorOf(props, fullIslandName) } } + private def fullIslandName: String = { + @tailrec def findUsefulName(i: Int): String = { + if (i == logics.size) islandName + else logics.get(i) match { + case _: DownstreamBoundaryStageLogic[_] | _: UpstreamBoundaryStageLogic[_] ⇒ + findUsefulName(i + 1) + case _ ⇒ + islandName + "-" + logics.get(i).attributes.nameOrDefault() + } + } + findUsefulName(0) + } + override def toString: String = "GraphStagePhase" } @@ -714,8 +729,8 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandN override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { subscriberOrVirtualPublisher match { - case v: VirtualPublisher[Any] ⇒ v.registerPublisher(publisher) - case s: Subscriber[Any] ⇒ publisher.subscribe(s) + case v: VirtualPublisher[_] ⇒ v.registerPublisher(publisher) + case s: Subscriber[Any] @unchecked ⇒ publisher.subscribe(s) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 2c9848f901..0c9fc7f3b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -357,5 +357,5 @@ final case class ProcessorModule[In, Out, Mat]( override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]" override private[stream] def traversalBuilder = - LinearTraversalBuilder.fromModule(this).makeIsland(ProcessorModuleIslandTag) + LinearTraversalBuilder.fromModule(this, attributes).makeIsland(ProcessorModuleIslandTag) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 2249970e36..da1f835f1c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -156,23 +156,26 @@ object TraversalBuilder { /** * Create a generic traversal builder starting from an atomic module. */ - def atomic(module: AtomicModule[Shape, Any], attributes: Attributes = Attributes.none): TraversalBuilder = { + def atomic(module: AtomicModule[Shape, Any], attributes: Attributes): TraversalBuilder = { initShape(module.shape) - if (module.shape.outlets.isEmpty) { - val b = CompletedTraversalBuilder( - traversalSoFar = MaterializeAtomic(module, Array.ofDim[Int](module.shape.outlets.size)), - inSlots = module.shape.inlets.size, - inToOffset = module.shape.inlets.map(in ⇒ in → in.id).toMap, - attributes) - b - } else { - AtomicTraversalBuilder( - module, - Array.ofDim[Int](module.shape.outlets.size), - module.shape.outlets.size, - attributes) - } + val builder = + if (module.shape.outlets.isEmpty) { + val b = CompletedTraversalBuilder( + traversalSoFar = MaterializeAtomic(module, Array.ofDim[Int](module.shape.outlets.size)), + inSlots = module.shape.inlets.size, + inToOffset = module.shape.inlets.map(in ⇒ in → in.id).toMap, + Attributes.none) + b + } else { + AtomicTraversalBuilder( + module, + Array.ofDim[Int](module.shape.outlets.size), + module.shape.outlets.size, + Attributes.none) + } + // important to use setAttributes because it will create island for async (dispatcher attribute) + builder.setAttributes(attributes) } def printTraversal(t: Traversal, indent: Int = 0): Unit = { @@ -477,7 +480,7 @@ object LinearTraversalBuilder { * Create a traversal builder specialized for linear graphs. This is designed to be much faster and lightweight * than its generic counterpart. It can be freely mixed with the generic builder in both ways. */ - def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes = Attributes.none): LinearTraversalBuilder = { + def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes): LinearTraversalBuilder = { require(module.shape.inlets.size <= 1, "Modules with more than one input port cannot be linear.") require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.") TraversalBuilder.initShape(module.shape) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 090d625b3d..0b2931e125 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -36,7 +36,7 @@ final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M]( if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage) else this - override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this) + override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this, attributes) override def toString: String = f"GraphStage($stage) [${System.identityHashCode(this)}%08x]" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index 33e4cef06a..0bf03b351f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -28,7 +28,7 @@ private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plain override def toString: String = f"TlsModule($closing) [${System.identityHashCode(this)}%08x]" - override private[stream] def traversalBuilder = TraversalBuilder.atomic(this).makeIsland(TlsModuleIslandTag) + override private[stream] def traversalBuilder = TraversalBuilder.atomic(this, attributes).makeIsland(TlsModuleIslandTag) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5c6bfc374a..8912ebbc8c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -30,8 +30,7 @@ import scala.compat.java8.FutureConverters._ */ final class Source[+Out, +Mat]( override val traversalBuilder: LinearTraversalBuilder, - override val shape: SourceShape[Out] -) + override val shape: SourceShape[Out]) extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { override type Repr[+O] = Source[O, Mat @uncheckedVariance] @@ -53,8 +52,7 @@ final class Source[+Out, +Mat]( new Source[T, Mat3]( traversalBuilder.append(toAppend, flow.shape, combine), - SourceShape(flow.shape.out) - ) + SourceShape(flow.shape.out)) } /** @@ -157,8 +155,7 @@ final class Source[+Out, +Mat]( */ override def async: Repr[Out] = new Source( traversalBuilder.makeIsland(GraphStageTag), - shape - ) + shape) /** * Converts this Scala DSL element to it's Java DSL counterpart. diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 5d331c2ca5..911775ee2a 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -28,7 +28,10 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, protected def initialAttributes: Attributes = Attributes.none - final override lazy val traversalBuilder: TraversalBuilder = TraversalBuilder.atomic(GraphStageModule(shape, initialAttributes, this)) + final override lazy val traversalBuilder: TraversalBuilder = { + val attr = initialAttributes + TraversalBuilder.atomic(GraphStageModule(shape, attr, this), attr) + } final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] { override def shape = GraphStageWithMaterializedValue.this.shape