=str simple signatures of methods having combineMat parameter + optimize ComposeReversed
Now all the `asInstanceOf` occurrences are bundled in a single place (ComposeOp.apply and Transform.apply).
This commit is contained in:
parent
c8097e4876
commit
c1dfee64ae
5 changed files with 56 additions and 39 deletions
|
|
@ -428,7 +428,7 @@ class LinearTraversalBuilderSpec extends AkkaSpec {
|
|||
"keep mapped materialized value of empty builder" in {
|
||||
val builder =
|
||||
LinearTraversalBuilder.empty()
|
||||
.transformMat[NotUsed, String](_ ⇒ "NOTUSED")
|
||||
.transformMat((_: Any) ⇒ "NOTUSED")
|
||||
.append(source.traversalBuilder, source.shape, Keep.left)
|
||||
.append(sink.traversalBuilder, sink.shape, Keep.left)
|
||||
|
||||
|
|
@ -556,7 +556,7 @@ class LinearTraversalBuilderSpec extends AkkaSpec {
|
|||
val builder = source.traversalBuilder
|
||||
.append(flow1.traversalBuilder, flow1.shape, Keep.right)
|
||||
.append(sink.traversalBuilder, sink.shape, Keep.left)
|
||||
.transformMat[String, String]("MAPPED: " + _)
|
||||
.transformMat("MAPPED: " + (_: String))
|
||||
|
||||
val mat = testMaterialize(builder)
|
||||
|
||||
|
|
@ -566,12 +566,12 @@ class LinearTraversalBuilderSpec extends AkkaSpec {
|
|||
"properly map materialized value (nested)" in {
|
||||
val flowBuilder =
|
||||
flow1.traversalBuilder
|
||||
.transformMat[String, String]("M1: " + _)
|
||||
.transformMat("M1: " + (_: String))
|
||||
|
||||
val builder = source.traversalBuilder
|
||||
.append(flowBuilder, flow1.shape, Keep.right)
|
||||
.append(sink.traversalBuilder, sink.shape, Keep.left)
|
||||
.transformMat[String, String]("M2: " + _)
|
||||
.transformMat("M2: " + (_: String))
|
||||
|
||||
val mat = testMaterialize(builder)
|
||||
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
|||
"keep mapped materialized value of empty builder" in {
|
||||
val builder =
|
||||
TraversalBuilder.empty()
|
||||
.transformMat[NotUsed, String](_ ⇒ "NOTUSED")
|
||||
.transformMat((_: Any) ⇒ "NOTUSED")
|
||||
.add(source.traversalBuilder, source.shape, Keep.left)
|
||||
.add(sink.traversalBuilder, sink.shape, Keep.left)
|
||||
.wire(source.out, sink.in)
|
||||
|
|
@ -336,7 +336,7 @@ class TraversalBuilderSpec extends AkkaSpec {
|
|||
.add(sink.traversalBuilder, sink.shape, Keep.left)
|
||||
.wire(source.out, flow1.in)
|
||||
.wire(flow1.out, sink.in)
|
||||
.transformMat[String, String]("MAPPED: " + _)
|
||||
.transformMat("MAPPED: " + (_: String))
|
||||
|
||||
val mat = testMaterialize(builder)
|
||||
|
||||
|
|
@ -346,14 +346,14 @@ class TraversalBuilderSpec extends AkkaSpec {
|
|||
"properly map materialized value (nested)" in {
|
||||
val flowBuilder =
|
||||
flow1.traversalBuilder
|
||||
.transformMat[String, String]("M1: " + _)
|
||||
.transformMat("M1: " + (_: String))
|
||||
|
||||
val builder = source.traversalBuilder
|
||||
.add(flowBuilder, flow1.shape, Keep.right)
|
||||
.add(sink.traversalBuilder, sink.shape, Keep.left)
|
||||
.wire(source.out, flow1.in)
|
||||
.wire(flow1.out, sink.in)
|
||||
.transformMat[String, String]("M2: " + _)
|
||||
.transformMat("M2: " + (_: String))
|
||||
|
||||
val mat = testMaterialize(builder)
|
||||
|
||||
|
|
|
|||
|
|
@ -148,14 +148,14 @@ object TraversalTestUtils {
|
|||
matValueStack.removeLast()
|
||||
case PushNotUsed ⇒
|
||||
matValueStack.addLast(NotUsed)
|
||||
case Transform(f) ⇒
|
||||
case transform: Transform ⇒
|
||||
val prev = matValueStack.removeLast()
|
||||
val result = f(prev)
|
||||
val result = transform(prev)
|
||||
matValueStack.addLast(result)
|
||||
case Compose(f) ⇒
|
||||
case compose: ComposeOp ⇒
|
||||
val second = matValueStack.removeLast()
|
||||
val first = matValueStack.removeLast()
|
||||
val result = f(first, second)
|
||||
val result = compose(first, second)
|
||||
matValueStack.addLast(result)
|
||||
case PushAttributes(attr) ⇒
|
||||
attributesStack.addLast(attributesStack.getLast and attr)
|
||||
|
|
|
|||
|
|
@ -463,15 +463,15 @@ case class PhasedFusingActorMaterializer(
|
|||
case PushNotUsed ⇒
|
||||
matValueStack.addLast(NotUsed)
|
||||
if (Debug) println(s"PUSH: NotUsed => $matValueStack")
|
||||
case Transform(f) ⇒
|
||||
case transform: Transform ⇒
|
||||
val prev = matValueStack.removeLast()
|
||||
val result = f(prev)
|
||||
val result = transform(prev)
|
||||
matValueStack.addLast(result)
|
||||
if (Debug) println(s"TRFM: $matValueStack")
|
||||
case Compose(f) ⇒
|
||||
case compose: ComposeOp ⇒
|
||||
val second = matValueStack.removeLast()
|
||||
val first = matValueStack.removeLast()
|
||||
val result = f(first, second)
|
||||
val result = compose(first, second)
|
||||
matValueStack.addLast(result)
|
||||
if (Debug) println(s"COMP: $matValueStack")
|
||||
case PushAttributes(attr) ⇒
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.stream.impl
|
|||
|
||||
import akka.stream._
|
||||
import akka.stream.impl.StreamLayout.AtomicModule
|
||||
import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
|
||||
import akka.stream.scaladsl.Keep
|
||||
|
||||
/**
|
||||
|
|
@ -114,8 +115,19 @@ sealed trait MaterializedValueOp extends Traversal
|
|||
|
||||
case object Pop extends MaterializedValueOp
|
||||
case object PushNotUsed extends MaterializedValueOp
|
||||
final case class Transform(mapper: Any ⇒ Any) extends MaterializedValueOp
|
||||
final case class Compose(composer: (Any, Any) ⇒ Any) extends MaterializedValueOp
|
||||
final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp {
|
||||
def apply(arg: Any): Any = mapper.asInstanceOf[Any ⇒ Any](arg)
|
||||
}
|
||||
trait ComposeOp extends MaterializedValueOp {
|
||||
def apply(arg1: Any, arg2: Any): Any
|
||||
}
|
||||
final case class Compose(composer: AnyFunction2) extends ComposeOp {
|
||||
def apply(arg1: Any, arg2: Any): Any = composer.asInstanceOf[(Any, Any) ⇒ Any](arg1, arg2)
|
||||
}
|
||||
/** An optimization which applies the arguments in reverse order */
|
||||
final case class ComposeReversed(composer: AnyFunction2) extends ComposeOp {
|
||||
def apply(arg1: Any, arg2: Any): Any = composer.asInstanceOf[(Any, Any) ⇒ Any](arg2, arg1)
|
||||
}
|
||||
|
||||
final case class PushAttributes(attributes: Attributes) extends Traversal
|
||||
final case object PopAttributes extends Traversal
|
||||
|
|
@ -124,6 +136,10 @@ final case class EnterIsland(islandTag: IslandTag) extends Traversal
|
|||
final case object ExitIsland extends Traversal
|
||||
|
||||
object TraversalBuilder {
|
||||
// The most generic function1 and function2 (also completely useless, as we have thrown away all types)
|
||||
// needs to be casted once to be useful (pending runtime exception in cases of bugs).
|
||||
type AnyFunction1 = Nothing ⇒ Any
|
||||
type AnyFunction2 = (Nothing, Nothing) ⇒ Any
|
||||
|
||||
private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none)
|
||||
|
||||
|
|
@ -192,6 +208,7 @@ object TraversalBuilder {
|
|||
case Pop ⇒ prindent("pop mat")
|
||||
case _: Transform ⇒ prindent("transform mat")
|
||||
case _: Compose ⇒ prindent("compose mat")
|
||||
case _: ComposeReversed ⇒ prindent("compose reversed mat")
|
||||
case PushAttributes(attr) ⇒ prindent("push attr " + attr)
|
||||
case PopAttributes ⇒ prindent("pop attr")
|
||||
case EnterIsland(tag) ⇒ prindent("enter island " + tag)
|
||||
|
|
@ -256,13 +273,13 @@ sealed trait TraversalBuilder {
|
|||
*
|
||||
* See append in the [[LinearTraversalBuilder]] for a more efficient alternative for linear graphs.
|
||||
*/
|
||||
def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder
|
||||
def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder
|
||||
|
||||
/**
|
||||
* Maps the materialized value produced by the module built-up so far with the provided function, providing a new
|
||||
* TraversalBuilder returning the mapped materialized value.
|
||||
*/
|
||||
def transformMat[A, B](f: A ⇒ B): TraversalBuilder
|
||||
def transformMat(f: AnyFunction1): TraversalBuilder
|
||||
|
||||
protected def internalSetAttributes(attributes: Attributes): TraversalBuilder
|
||||
|
||||
|
|
@ -353,7 +370,7 @@ final case class CompletedTraversalBuilder(
|
|||
attributes: Attributes,
|
||||
islandTag: Option[IslandTag] = None) extends TraversalBuilder {
|
||||
|
||||
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = {
|
||||
override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
|
||||
val key = new BuilderKey
|
||||
CompositeTraversalBuilder(
|
||||
reverseBuildSteps = key :: Nil,
|
||||
|
|
@ -373,8 +390,8 @@ final case class CompletedTraversalBuilder(
|
|||
else PushAttributes(attributes).concat(withIsland).concat(PopAttributes)
|
||||
}
|
||||
|
||||
override def transformMat[A, B](f: (A) ⇒ B): TraversalBuilder =
|
||||
copy(traversalSoFar = traversalSoFar.concat(Transform(f.asInstanceOf[Any ⇒ Any])))
|
||||
override def transformMat(f: AnyFunction1): TraversalBuilder =
|
||||
copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
|
||||
|
||||
override def offsetOf(in: InPort): Int = inToOffset(in)
|
||||
|
||||
|
|
@ -414,7 +431,7 @@ final case class AtomicTraversalBuilder(
|
|||
unwiredOuts: Int,
|
||||
attributes: Attributes) extends TraversalBuilder {
|
||||
|
||||
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = {
|
||||
override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
|
||||
// TODO: Use automatically a linear builder if applicable
|
||||
// Create a composite, add ourselves, then the other.
|
||||
CompositeTraversalBuilder(attributes = attributes)
|
||||
|
|
@ -422,7 +439,7 @@ final case class AtomicTraversalBuilder(
|
|||
.add(submodule, shape, combineMat)
|
||||
}
|
||||
|
||||
override def transformMat[A, B](f: (A) ⇒ B): TraversalBuilder =
|
||||
override def transformMat(f: AnyFunction1): TraversalBuilder =
|
||||
TraversalBuilder.empty().add(this, module.shape, Keep.right).transformMat(f)
|
||||
|
||||
override val inSlots: Int = module.shape.inlets.size
|
||||
|
|
@ -497,19 +514,19 @@ object LinearTraversalBuilder {
|
|||
attributes)
|
||||
}
|
||||
|
||||
def addMatCompose[A, B](t: Traversal, matCompose: (A, B) ⇒ Any): Traversal = {
|
||||
def addMatCompose(t: Traversal, matCompose: AnyFunction2): Traversal = {
|
||||
if (matCompose eq Keep.left)
|
||||
Pop.concat(t)
|
||||
else if (matCompose eq Keep.right)
|
||||
t.concat(Pop)
|
||||
else // TODO: Optimize this case so the extra function allocation is not needed. Maybe ReverseCompose?
|
||||
t.concat(Compose((second, first) ⇒ matCompose.asInstanceOf[(Any, Any) ⇒ Any](first, second)))
|
||||
else
|
||||
t.concat(ComposeReversed(matCompose))
|
||||
}
|
||||
|
||||
def fromBuilder[A, B](
|
||||
def fromBuilder(
|
||||
traversalBuilder: TraversalBuilder,
|
||||
shape: Shape,
|
||||
combine: (A, B) ⇒ Any = Keep.right[A, B]): LinearTraversalBuilder = {
|
||||
combine: AnyFunction2 = Keep.right): LinearTraversalBuilder = {
|
||||
traversalBuilder match {
|
||||
case linear: LinearTraversalBuilder ⇒
|
||||
if (combine eq Keep.right) linear
|
||||
|
|
@ -574,7 +591,7 @@ final case class LinearTraversalBuilder(
|
|||
|
||||
protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty
|
||||
|
||||
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = {
|
||||
override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
|
||||
throw new UnsupportedOperationException("LinearTraversal does not support free-form addition. Add it into a" +
|
||||
"composite builder instead and add the second module to that.")
|
||||
}
|
||||
|
|
@ -688,7 +705,7 @@ final case class LinearTraversalBuilder(
|
|||
|
||||
override def unwiredOuts: Int = if (outPort.isDefined) 1 else 0
|
||||
|
||||
def append[A, B, C](toAppend: TraversalBuilder, shape: Shape, matCompose: (A, B) ⇒ C): LinearTraversalBuilder =
|
||||
def append(toAppend: TraversalBuilder, shape: Shape, matCompose: AnyFunction2): LinearTraversalBuilder =
|
||||
append(LinearTraversalBuilder.fromBuilder(toAppend, shape, Keep.right), matCompose)
|
||||
|
||||
// We don't really need the Shape for the linear append, but it is nicer to keep the API uniform here
|
||||
|
|
@ -696,7 +713,7 @@ final case class LinearTraversalBuilder(
|
|||
* Append any builder that is linear shaped (have at most one input and at most one output port) to the
|
||||
* end of this graph, connecting the output of the last module to the input of the appended module.
|
||||
*/
|
||||
def append[A, B, C](toAppend: LinearTraversalBuilder, matCompose: (A, B) ⇒ C): LinearTraversalBuilder = {
|
||||
def append(toAppend: LinearTraversalBuilder, matCompose: AnyFunction2): LinearTraversalBuilder = {
|
||||
|
||||
if (toAppend.isEmpty) {
|
||||
copy(
|
||||
|
|
@ -917,8 +934,8 @@ final case class LinearTraversalBuilder(
|
|||
|
||||
}
|
||||
|
||||
override def transformMat[A, B](f: (A) ⇒ B): LinearTraversalBuilder = {
|
||||
copy(traversalSoFar = traversalSoFar.concat(Transform(f.asInstanceOf[Any ⇒ Any])))
|
||||
override def transformMat(f: AnyFunction1): LinearTraversalBuilder = {
|
||||
copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1066,7 +1083,7 @@ final case class CompositeTraversalBuilder(
|
|||
}
|
||||
|
||||
// Requires that a remapped Shape's ports contain the same ID as their target ports!
|
||||
def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = {
|
||||
def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = {
|
||||
val builderKey = new BuilderKey
|
||||
|
||||
val newBuildSteps =
|
||||
|
|
@ -1085,7 +1102,7 @@ final case class CompositeTraversalBuilder(
|
|||
builderKey ::
|
||||
reverseBuildSteps
|
||||
} else {
|
||||
AppendTraversal(Compose(combineMat.asInstanceOf[(Any, Any) ⇒ Any])) ::
|
||||
AppendTraversal(Compose(combineMat)) ::
|
||||
builderKey ::
|
||||
reverseBuildSteps
|
||||
}
|
||||
|
|
@ -1152,8 +1169,8 @@ final case class CompositeTraversalBuilder(
|
|||
copy(inOffsets = inOffsets - in).assign(out, offsetOf(in) - offsetOfModule(out))
|
||||
}
|
||||
|
||||
override def transformMat[A, B](f: (A) ⇒ B): TraversalBuilder = {
|
||||
copy(finalSteps = finalSteps.concat(Transform(f.asInstanceOf[Any ⇒ Any])))
|
||||
override def transformMat(f: AnyFunction1): TraversalBuilder = {
|
||||
copy(finalSteps = finalSteps.concat(Transform(f)))
|
||||
}
|
||||
|
||||
override def makeIsland(islandTag: IslandTag): TraversalBuilder = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue