diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 7d790c457f..1037087c06 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -15,7 +15,7 @@ trait GraphApply { val builder = new GraphDSL.Builder val s = buildBlock(builder) - new GraphApply.GraphImpl(s, builder.traversalBuilder) + createGraph(s, builder) } /** @@ -27,7 +27,7 @@ trait GraphApply { val s1 = builder.add(g1, Keep.right) val s = buildBlock(builder)(s1) - new GraphApply.GraphImpl(s, builder.traversalBuilder) + createGraph(s, builder) } @@ -45,25 +45,11 @@ trait GraphApply { ] val s = buildBlock(builder)([#s1#]) - new GraphApply.GraphImpl(s, builder.traversalBuilder) + createGraph(s, builder) }# ] + + private def createGraph[S <: Shape, Mat](shape: S, graphBuilder: GraphDSL.Builder[Mat]): Graph[S, Mat] = + new GenericGraph(shape, graphBuilder.traversalBuilder) } - -/** - * INTERNAL API - */ -object GraphApply { - final class GraphImpl[S <: Shape, Mat](override val shape: S, override val traversalBuilder: TraversalBuilder) - extends Graph[S, Mat] { - - override def toString: String = s"Graph($shape)" - - override def withAttributes(attr: Attributes): Graph[S, Mat] = - new GraphImpl(shape, traversalBuilder.setAttributes(attr)) - - override def named(name: String): Graph[S, Mat] = addAttributes(Attributes.name(name)) - } -} - diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 42c0bbdb62..23aa573e26 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -17,6 +17,37 @@ import scala.collection.immutable import scala.concurrent.Promise import scala.util.control.{ NoStackTrace, NonFatal } +/** + * INTERNAL API + * + * The implementation of a graph with an arbitrary shape. + */ +private[stream] final class GenericGraph[S <: Shape, Mat](override val shape: S, override val traversalBuilder: TraversalBuilder) + extends Graph[S, Mat] { outer ⇒ + + override def toString: String = s"GenericGraph($shape)" + + override def withAttributes(attr: Attributes): Graph[S, Mat] = + new GenericGraphWithChangedAttributes(shape, traversalBuilder, attr) +} + +/** + * INTERNAL API + * + * The implementation of a graph with an arbitrary shape with changed attributes. Changing attributes again + * prevents building up a chain of changes. + */ +private[stream] final class GenericGraphWithChangedAttributes[S <: Shape, Mat](override val shape: S, originalTraversalBuilder: TraversalBuilder, newAttributes: Attributes) + extends Graph[S, Mat] { outer ⇒ + + private[stream] def traversalBuilder: TraversalBuilder = originalTraversalBuilder.setAttributes(newAttributes) + + override def toString: String = s"GenericGraphWithChangedAttributes($shape)" + + override def withAttributes(attr: Attributes): Graph[S, Mat] = + new GenericGraphWithChangedAttributes(shape, originalTraversalBuilder, attr) +} + object Merge { /** * Create a new `Merge` with the specified number of input ports. diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 911775ee2a..5586cb75c7 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -20,6 +20,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration import akka.stream.actor.ActorSubscriberMessage +import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes } abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] { @@ -33,12 +34,8 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, TraversalBuilder.atomic(GraphStageModule(shape, attr, this), attr) } - final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] { - override def shape = GraphStageWithMaterializedValue.this.shape - override def traversalBuilder = GraphStageWithMaterializedValue.this.traversalBuilder.setAttributes(attr) - - override def withAttributes(attr: Attributes) = GraphStageWithMaterializedValue.this.withAttributes(attr) - } + final override def withAttributes(attr: Attributes): Graph[S, M] = + new GenericGraphWithChangedAttributes(shape, GraphStageWithMaterializedValue.this.traversalBuilder, attr) } /**