=str refactor GraphApply.GraphImpl to be reused at more places

This commit is contained in:
Johannes Rudolph 2017-03-07 16:08:44 +01:00
parent 45ccd4ffbe
commit 7677cc3b38
3 changed files with 40 additions and 26 deletions

View file

@ -15,7 +15,7 @@ trait GraphApply {
val builder = new GraphDSL.Builder
val s = buildBlock(builder)
new GraphApply.GraphImpl(s, builder.traversalBuilder)
createGraph(s, builder)
}
/**
@ -27,7 +27,7 @@ trait GraphApply {
val s1 = builder.add(g1, Keep.right)
val s = buildBlock(builder)(s1)
new GraphApply.GraphImpl(s, builder.traversalBuilder)
createGraph(s, builder)
}
@ -45,25 +45,11 @@ trait GraphApply {
]
val s = buildBlock(builder)([#s1#])
new GraphApply.GraphImpl(s, builder.traversalBuilder)
createGraph(s, builder)
}#
]
private def createGraph[S <: Shape, Mat](shape: S, graphBuilder: GraphDSL.Builder[Mat]): Graph[S, Mat] =
new GenericGraph(shape, graphBuilder.traversalBuilder)
}
/**
* INTERNAL API
*/
object GraphApply {
final class GraphImpl[S <: Shape, Mat](override val shape: S, override val traversalBuilder: TraversalBuilder)
extends Graph[S, Mat] {
override def toString: String = s"Graph($shape)"
override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GraphImpl(shape, traversalBuilder.setAttributes(attr))
override def named(name: String): Graph[S, Mat] = addAttributes(Attributes.name(name))
}
}

View file

@ -17,6 +17,37 @@ import scala.collection.immutable
import scala.concurrent.Promise
import scala.util.control.{ NoStackTrace, NonFatal }
/**
* INTERNAL API
*
* The implementation of a graph with an arbitrary shape.
*/
private[stream] final class GenericGraph[S <: Shape, Mat](override val shape: S, override val traversalBuilder: TraversalBuilder)
extends Graph[S, Mat] { outer
override def toString: String = s"GenericGraph($shape)"
override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GenericGraphWithChangedAttributes(shape, traversalBuilder, attr)
}
/**
* INTERNAL API
*
* The implementation of a graph with an arbitrary shape with changed attributes. Changing attributes again
* prevents building up a chain of changes.
*/
private[stream] final class GenericGraphWithChangedAttributes[S <: Shape, Mat](override val shape: S, originalTraversalBuilder: TraversalBuilder, newAttributes: Attributes)
extends Graph[S, Mat] { outer
private[stream] def traversalBuilder: TraversalBuilder = originalTraversalBuilder.setAttributes(newAttributes)
override def toString: String = s"GenericGraphWithChangedAttributes($shape)"
override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GenericGraphWithChangedAttributes(shape, originalTraversalBuilder, attr)
}
object Merge {
/**
* Create a new `Merge` with the specified number of input ports.

View file

@ -20,6 +20,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes }
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {
@ -33,12 +34,8 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S,
TraversalBuilder.atomic(GraphStageModule(shape, attr, this), attr)
}
final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] {
override def shape = GraphStageWithMaterializedValue.this.shape
override def traversalBuilder = GraphStageWithMaterializedValue.this.traversalBuilder.setAttributes(attr)
override def withAttributes(attr: Attributes) = GraphStageWithMaterializedValue.this.withAttributes(attr)
}
final override def withAttributes(attr: Attributes): Graph[S, M] =
new GenericGraphWithChangedAttributes(shape, GraphStageWithMaterializedValue.this.traversalBuilder, attr)
}
/**