Merge pull request #22513 from jrudolph/jr/w/remove-most-asInstanceOfs-in-TraversalBuilder

Small improvements to TraversalBuilder
This commit is contained in:
Johannes Rudolph 2017-03-13 09:43:55 +01:00 committed by GitHub
commit d5660acb64
5 changed files with 58 additions and 39 deletions

View file

@ -428,7 +428,7 @@ class LinearTraversalBuilderSpec extends AkkaSpec {
"keep mapped materialized value of empty builder" in { "keep mapped materialized value of empty builder" in {
val builder = val builder =
LinearTraversalBuilder.empty() LinearTraversalBuilder.empty()
.transformMat[NotUsed, String](_ "NOTUSED") .transformMat((_: Any) "NOTUSED")
.append(source.traversalBuilder, source.shape, Keep.left) .append(source.traversalBuilder, source.shape, Keep.left)
.append(sink.traversalBuilder, sink.shape, Keep.left) .append(sink.traversalBuilder, sink.shape, Keep.left)
@ -556,7 +556,7 @@ class LinearTraversalBuilderSpec extends AkkaSpec {
val builder = source.traversalBuilder val builder = source.traversalBuilder
.append(flow1.traversalBuilder, flow1.shape, Keep.right) .append(flow1.traversalBuilder, flow1.shape, Keep.right)
.append(sink.traversalBuilder, sink.shape, Keep.left) .append(sink.traversalBuilder, sink.shape, Keep.left)
.transformMat[String, String]("MAPPED: " + _) .transformMat("MAPPED: " + (_: String))
val mat = testMaterialize(builder) val mat = testMaterialize(builder)
@ -566,12 +566,12 @@ class LinearTraversalBuilderSpec extends AkkaSpec {
"properly map materialized value (nested)" in { "properly map materialized value (nested)" in {
val flowBuilder = val flowBuilder =
flow1.traversalBuilder flow1.traversalBuilder
.transformMat[String, String]("M1: " + _) .transformMat("M1: " + (_: String))
val builder = source.traversalBuilder val builder = source.traversalBuilder
.append(flowBuilder, flow1.shape, Keep.right) .append(flowBuilder, flow1.shape, Keep.right)
.append(sink.traversalBuilder, sink.shape, Keep.left) .append(sink.traversalBuilder, sink.shape, Keep.left)
.transformMat[String, String]("M2: " + _) .transformMat("M2: " + (_: String))
val mat = testMaterialize(builder) val mat = testMaterialize(builder)

View file

@ -238,7 +238,7 @@ class TraversalBuilderSpec extends AkkaSpec {
"keep mapped materialized value of empty builder" in { "keep mapped materialized value of empty builder" in {
val builder = val builder =
TraversalBuilder.empty() TraversalBuilder.empty()
.transformMat[NotUsed, String](_ "NOTUSED") .transformMat((_: Any) "NOTUSED")
.add(source.traversalBuilder, source.shape, Keep.left) .add(source.traversalBuilder, source.shape, Keep.left)
.add(sink.traversalBuilder, sink.shape, Keep.left) .add(sink.traversalBuilder, sink.shape, Keep.left)
.wire(source.out, sink.in) .wire(source.out, sink.in)
@ -336,7 +336,7 @@ class TraversalBuilderSpec extends AkkaSpec {
.add(sink.traversalBuilder, sink.shape, Keep.left) .add(sink.traversalBuilder, sink.shape, Keep.left)
.wire(source.out, flow1.in) .wire(source.out, flow1.in)
.wire(flow1.out, sink.in) .wire(flow1.out, sink.in)
.transformMat[String, String]("MAPPED: " + _) .transformMat("MAPPED: " + (_: String))
val mat = testMaterialize(builder) val mat = testMaterialize(builder)
@ -346,14 +346,14 @@ class TraversalBuilderSpec extends AkkaSpec {
"properly map materialized value (nested)" in { "properly map materialized value (nested)" in {
val flowBuilder = val flowBuilder =
flow1.traversalBuilder flow1.traversalBuilder
.transformMat[String, String]("M1: " + _) .transformMat("M1: " + (_: String))
val builder = source.traversalBuilder val builder = source.traversalBuilder
.add(flowBuilder, flow1.shape, Keep.right) .add(flowBuilder, flow1.shape, Keep.right)
.add(sink.traversalBuilder, sink.shape, Keep.left) .add(sink.traversalBuilder, sink.shape, Keep.left)
.wire(source.out, flow1.in) .wire(source.out, flow1.in)
.wire(flow1.out, sink.in) .wire(flow1.out, sink.in)
.transformMat[String, String]("M2: " + _) .transformMat("M2: " + (_: String))
val mat = testMaterialize(builder) val mat = testMaterialize(builder)

View file

@ -148,14 +148,14 @@ object TraversalTestUtils {
matValueStack.removeLast() matValueStack.removeLast()
case PushNotUsed case PushNotUsed
matValueStack.addLast(NotUsed) matValueStack.addLast(NotUsed)
case Transform(f) case transform: Transform
val prev = matValueStack.removeLast() val prev = matValueStack.removeLast()
val result = f(prev) val result = transform(prev)
matValueStack.addLast(result) matValueStack.addLast(result)
case Compose(f) case compose: ComposeOp
val second = matValueStack.removeLast() val second = matValueStack.removeLast()
val first = matValueStack.removeLast() val first = matValueStack.removeLast()
val result = f(first, second) val result = compose(first, second)
matValueStack.addLast(result) matValueStack.addLast(result)
case PushAttributes(attr) case PushAttributes(attr)
attributesStack.addLast(attributesStack.getLast and attr) attributesStack.addLast(attributesStack.getLast and attr)

View file

@ -466,15 +466,15 @@ case class PhasedFusingActorMaterializer(
case PushNotUsed case PushNotUsed
matValueStack.addLast(NotUsed) matValueStack.addLast(NotUsed)
if (Debug) println(s"PUSH: NotUsed => $matValueStack") if (Debug) println(s"PUSH: NotUsed => $matValueStack")
case Transform(f) case transform: Transform
val prev = matValueStack.removeLast() val prev = matValueStack.removeLast()
val result = f(prev) val result = transform(prev)
matValueStack.addLast(result) matValueStack.addLast(result)
if (Debug) println(s"TRFM: $matValueStack") if (Debug) println(s"TRFM: $matValueStack")
case Compose(f) case compose: ComposeOp
val second = matValueStack.removeLast() val second = matValueStack.removeLast()
val first = matValueStack.removeLast() val first = matValueStack.removeLast()
val result = f(first, second) val result = compose(first, second)
matValueStack.addLast(result) matValueStack.addLast(result)
if (Debug) println(s"COMP: $matValueStack") if (Debug) println(s"COMP: $matValueStack")
case PushAttributes(attr) case PushAttributes(attr)

View file

@ -6,6 +6,7 @@ package akka.stream.impl
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Keep
import akka.util.OptionVal import akka.util.OptionVal
import scala.language.existentials import scala.language.existentials
@ -117,8 +118,19 @@ sealed trait MaterializedValueOp extends Traversal
case object Pop extends MaterializedValueOp case object Pop extends MaterializedValueOp
case object PushNotUsed extends MaterializedValueOp case object PushNotUsed extends MaterializedValueOp
final case class Transform(mapper: Any Any) extends MaterializedValueOp final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp {
final case class Compose(composer: (Any, Any) Any) extends MaterializedValueOp def apply(arg: Any): Any = mapper.asInstanceOf[Any Any](arg)
}
trait ComposeOp extends MaterializedValueOp {
def apply(arg1: Any, arg2: Any): Any
}
final case class Compose(composer: AnyFunction2) extends ComposeOp {
def apply(arg1: Any, arg2: Any): Any = composer.asInstanceOf[(Any, Any) Any](arg1, arg2)
}
/** An optimization which applies the arguments in reverse order */
final case class ComposeReversed(composer: AnyFunction2) extends ComposeOp {
def apply(arg1: Any, arg2: Any): Any = composer.asInstanceOf[(Any, Any) Any](arg2, arg1)
}
final case class PushAttributes(attributes: Attributes) extends Traversal final case class PushAttributes(attributes: Attributes) extends Traversal
final case object PopAttributes extends Traversal final case object PopAttributes extends Traversal
@ -127,6 +139,10 @@ final case class EnterIsland(islandTag: IslandTag) extends Traversal
final case object ExitIsland extends Traversal final case object ExitIsland extends Traversal
object TraversalBuilder { object TraversalBuilder {
// The most generic function1 and function2 (also completely useless, as we have thrown away all types)
// needs to be casted once to be useful (pending runtime exception in cases of bugs).
type AnyFunction1 = Nothing Any
type AnyFunction2 = (Nothing, Nothing) Any
private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none) private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none)
@ -209,6 +225,7 @@ object TraversalBuilder {
case Pop prindent("pop mat") case Pop prindent("pop mat")
case _: Transform prindent("transform mat") case _: Transform prindent("transform mat")
case _: Compose prindent("compose mat") case _: Compose prindent("compose mat")
case _: ComposeReversed prindent("compose reversed mat")
case PushAttributes(attr) prindent("push attr " + attr) case PushAttributes(attr) prindent("push attr " + attr)
case PopAttributes prindent("pop attr") case PopAttributes prindent("pop attr")
case EnterIsland(tag) prindent("enter island " + tag) case EnterIsland(tag) prindent("enter island " + tag)
@ -273,13 +290,13 @@ sealed trait TraversalBuilder {
* *
* See append in the [[LinearTraversalBuilder]] for a more efficient alternative for linear graphs. * See append in the [[LinearTraversalBuilder]] for a more efficient alternative for linear graphs.
*/ */
def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) C): TraversalBuilder def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder
/** /**
* Maps the materialized value produced by the module built-up so far with the provided function, providing a new * Maps the materialized value produced by the module built-up so far with the provided function, providing a new
* TraversalBuilder returning the mapped materialized value. * TraversalBuilder returning the mapped materialized value.
*/ */
def transformMat[A, B](f: A B): TraversalBuilder def transformMat(f: AnyFunction1): TraversalBuilder
protected def internalSetAttributes(attributes: Attributes): TraversalBuilder protected def internalSetAttributes(attributes: Attributes): TraversalBuilder
@ -370,7 +387,7 @@ final case class CompletedTraversalBuilder(
attributes: Attributes, attributes: Attributes,
islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder { islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder {
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) C): TraversalBuilder = { override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
val key = new BuilderKey val key = new BuilderKey
CompositeTraversalBuilder( CompositeTraversalBuilder(
reverseBuildSteps = key :: Nil, reverseBuildSteps = key :: Nil,
@ -390,8 +407,8 @@ final case class CompletedTraversalBuilder(
else PushAttributes(attributes).concat(withIsland).concat(PopAttributes) else PushAttributes(attributes).concat(withIsland).concat(PopAttributes)
} }
override def transformMat[A, B](f: (A) B): TraversalBuilder = override def transformMat(f: AnyFunction1): TraversalBuilder =
copy(traversalSoFar = traversalSoFar.concat(Transform(f.asInstanceOf[Any Any]))) copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
override def offsetOf(in: InPort): Int = inToOffset(in) override def offsetOf(in: InPort): Int = inToOffset(in)
@ -431,7 +448,7 @@ final case class AtomicTraversalBuilder(
unwiredOuts: Int, unwiredOuts: Int,
attributes: Attributes) extends TraversalBuilder { attributes: Attributes) extends TraversalBuilder {
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) C): TraversalBuilder = { override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
// TODO: Use automatically a linear builder if applicable // TODO: Use automatically a linear builder if applicable
// Create a composite, add ourselves, then the other. // Create a composite, add ourselves, then the other.
CompositeTraversalBuilder(attributes = attributes) CompositeTraversalBuilder(attributes = attributes)
@ -439,7 +456,7 @@ final case class AtomicTraversalBuilder(
.add(submodule, shape, combineMat) .add(submodule, shape, combineMat)
} }
override def transformMat[A, B](f: (A) B): TraversalBuilder = override def transformMat(f: AnyFunction1): TraversalBuilder =
TraversalBuilder.empty().add(this, module.shape, Keep.right).transformMat(f) TraversalBuilder.empty().add(this, module.shape, Keep.right).transformMat(f)
override val inSlots: Int = module.shape.inlets.size override val inSlots: Int = module.shape.inlets.size
@ -519,19 +536,21 @@ object LinearTraversalBuilder {
attributes) attributes)
} }
def addMatCompose[A, B](t: Traversal, matCompose: (A, B) Any): Traversal = { def addMatCompose(t: Traversal, matCompose: AnyFunction2): Traversal = {
if (matCompose eq Keep.left) if (matCompose eq Keep.left)
Pop.concat(t) Pop.concat(t)
else if (matCompose eq Keep.right) else if (matCompose eq Keep.right)
t.concat(Pop) t.concat(Pop)
else // TODO: Optimize this case so the extra function allocation is not needed. Maybe ReverseCompose? else if (matCompose eq Keep.none)
t.concat(Compose((second, first) matCompose.asInstanceOf[(Any, Any) Any](first, second))) t.concat(Pop).concat(Pop).concat(PushNotUsed)
else
t.concat(ComposeReversed(matCompose))
} }
def fromBuilder[A, B]( def fromBuilder(
traversalBuilder: TraversalBuilder, traversalBuilder: TraversalBuilder,
shape: Shape, shape: Shape,
combine: (A, B) Any = Keep.right[A, B]): LinearTraversalBuilder = { combine: AnyFunction2 = Keep.right): LinearTraversalBuilder = {
traversalBuilder match { traversalBuilder match {
case linear: LinearTraversalBuilder case linear: LinearTraversalBuilder
if (combine eq Keep.right) linear if (combine eq Keep.right) linear
@ -596,7 +615,7 @@ final case class LinearTraversalBuilder(
protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) C): TraversalBuilder = { override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
throw new UnsupportedOperationException("LinearTraversal does not support free-form addition. Add it into a" + throw new UnsupportedOperationException("LinearTraversal does not support free-form addition. Add it into a" +
"composite builder instead and add the second module to that.") "composite builder instead and add the second module to that.")
} }
@ -716,7 +735,7 @@ final case class LinearTraversalBuilder(
override def unwiredOuts: Int = if (outPort.isDefined) 1 else 0 override def unwiredOuts: Int = if (outPort.isDefined) 1 else 0
def append[A, B, C](toAppend: TraversalBuilder, shape: Shape, matCompose: (A, B) C): LinearTraversalBuilder = def append(toAppend: TraversalBuilder, shape: Shape, matCompose: AnyFunction2): LinearTraversalBuilder =
append(LinearTraversalBuilder.fromBuilder(toAppend, shape, Keep.right), matCompose) append(LinearTraversalBuilder.fromBuilder(toAppend, shape, Keep.right), matCompose)
// We don't really need the Shape for the linear append, but it is nicer to keep the API uniform here // We don't really need the Shape for the linear append, but it is nicer to keep the API uniform here
@ -724,7 +743,7 @@ final case class LinearTraversalBuilder(
* Append any builder that is linear shaped (have at most one input and at most one output port) to the * Append any builder that is linear shaped (have at most one input and at most one output port) to the
* end of this graph, connecting the output of the last module to the input of the appended module. * end of this graph, connecting the output of the last module to the input of the appended module.
*/ */
def append[A, B, C](toAppend: LinearTraversalBuilder, matCompose: (A, B) C): LinearTraversalBuilder = { def append(toAppend: LinearTraversalBuilder, matCompose: AnyFunction2): LinearTraversalBuilder = {
if (toAppend.isEmpty) { if (toAppend.isEmpty) {
copy( copy(
@ -945,8 +964,8 @@ final case class LinearTraversalBuilder(
} }
override def transformMat[A, B](f: (A) B): LinearTraversalBuilder = { override def transformMat(f: AnyFunction1): LinearTraversalBuilder = {
copy(traversalSoFar = traversalSoFar.concat(Transform(f.asInstanceOf[Any Any]))) copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
} }
/** /**
@ -1097,7 +1116,7 @@ final case class CompositeTraversalBuilder(
} }
// Requires that a remapped Shape's ports contain the same ID as their target ports! // Requires that a remapped Shape's ports contain the same ID as their target ports!
def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) C): TraversalBuilder = { def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
val builderKey = new BuilderKey val builderKey = new BuilderKey
val newBuildSteps = val newBuildSteps =
@ -1116,7 +1135,7 @@ final case class CompositeTraversalBuilder(
builderKey :: builderKey ::
reverseBuildSteps reverseBuildSteps
} else { } else {
AppendTraversal(Compose(combineMat.asInstanceOf[(Any, Any) Any])) :: AppendTraversal(Compose(combineMat)) ::
builderKey :: builderKey ::
reverseBuildSteps reverseBuildSteps
} }
@ -1183,8 +1202,8 @@ final case class CompositeTraversalBuilder(
copy(inOffsets = inOffsets - in).assign(out, offsetOf(in) - offsetOfModule(out)) copy(inOffsets = inOffsets - in).assign(out, offsetOf(in) - offsetOfModule(out))
} }
override def transformMat[A, B](f: (A) B): TraversalBuilder = { override def transformMat(f: AnyFunction1): TraversalBuilder = {
copy(finalSteps = finalSteps.concat(Transform(f.asInstanceOf[Any Any]))) copy(finalSteps = finalSteps.concat(Transform(f)))
} }
override def makeIsland(islandTag: IslandTag): TraversalBuilder = { override def makeIsland(islandTag: IslandTag): TraversalBuilder = {