From c1dfee64ae38b22d75f79f0724cdab8178817424 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 9 Mar 2017 15:43:21 +0100 Subject: [PATCH 1/2] =str simple signatures of methods having combineMat parameter + optimize ComposeReversed Now all the `asInstanceOf` occurrences are bundled in a single place (ComposeOp.apply and Transform.apply). --- .../impl/LinearTraversalBuilderSpec.scala | 8 +-- .../stream/impl/TraversalBuilderSpec.scala | 8 +-- .../akka/stream/impl/TraversalTestUtils.scala | 8 +-- .../impl/PhasedFusingActorMaterializer.scala | 8 +-- .../akka/stream/impl/TraversalBuilder.scala | 63 ++++++++++++------- 5 files changed, 56 insertions(+), 39 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/LinearTraversalBuilderSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/LinearTraversalBuilderSpec.scala index 98724817c0..cc167d256f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/LinearTraversalBuilderSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/LinearTraversalBuilderSpec.scala @@ -428,7 +428,7 @@ class LinearTraversalBuilderSpec extends AkkaSpec { "keep mapped materialized value of empty builder" in { val builder = LinearTraversalBuilder.empty() - .transformMat[NotUsed, String](_ ⇒ "NOTUSED") + .transformMat((_: Any) ⇒ "NOTUSED") .append(source.traversalBuilder, source.shape, Keep.left) .append(sink.traversalBuilder, sink.shape, Keep.left) @@ -556,7 +556,7 @@ class LinearTraversalBuilderSpec extends AkkaSpec { val builder = source.traversalBuilder .append(flow1.traversalBuilder, flow1.shape, Keep.right) .append(sink.traversalBuilder, sink.shape, Keep.left) - .transformMat[String, String]("MAPPED: " + _) + .transformMat("MAPPED: " + (_: String)) val mat = testMaterialize(builder) @@ -566,12 +566,12 @@ class LinearTraversalBuilderSpec extends AkkaSpec { "properly map materialized value (nested)" in { val flowBuilder = flow1.traversalBuilder - .transformMat[String, String]("M1: " + _) + .transformMat("M1: " + (_: String)) val builder = source.traversalBuilder .append(flowBuilder, flow1.shape, Keep.right) .append(sink.traversalBuilder, sink.shape, Keep.left) - .transformMat[String, String]("M2: " + _) + .transformMat("M2: " + (_: String)) val mat = testMaterialize(builder) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala index 8db2629a77..476178ee42 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala @@ -238,7 +238,7 @@ class TraversalBuilderSpec extends AkkaSpec { "keep mapped materialized value of empty builder" in { val builder = TraversalBuilder.empty() - .transformMat[NotUsed, String](_ ⇒ "NOTUSED") + .transformMat((_: Any) ⇒ "NOTUSED") .add(source.traversalBuilder, source.shape, Keep.left) .add(sink.traversalBuilder, sink.shape, Keep.left) .wire(source.out, sink.in) @@ -336,7 +336,7 @@ class TraversalBuilderSpec extends AkkaSpec { .add(sink.traversalBuilder, sink.shape, Keep.left) .wire(source.out, flow1.in) .wire(flow1.out, sink.in) - .transformMat[String, String]("MAPPED: " + _) + .transformMat("MAPPED: " + (_: String)) val mat = testMaterialize(builder) @@ -346,14 +346,14 @@ class TraversalBuilderSpec extends AkkaSpec { "properly map materialized value (nested)" in { val flowBuilder = flow1.traversalBuilder - .transformMat[String, String]("M1: " + _) + .transformMat("M1: " + (_: String)) val builder = source.traversalBuilder .add(flowBuilder, flow1.shape, Keep.right) .add(sink.traversalBuilder, sink.shape, Keep.left) .wire(source.out, flow1.in) .wire(flow1.out, sink.in) - .transformMat[String, String]("M2: " + _) + .transformMat("M2: " + (_: String)) val mat = testMaterialize(builder) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalTestUtils.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalTestUtils.scala index 9b4b8c94bf..ba95b3e56e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalTestUtils.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalTestUtils.scala @@ -148,14 +148,14 @@ object TraversalTestUtils { matValueStack.removeLast() case PushNotUsed ⇒ matValueStack.addLast(NotUsed) - case Transform(f) ⇒ + case transform: Transform ⇒ val prev = matValueStack.removeLast() - val result = f(prev) + val result = transform(prev) matValueStack.addLast(result) - case Compose(f) ⇒ + case compose: ComposeOp ⇒ val second = matValueStack.removeLast() val first = matValueStack.removeLast() - val result = f(first, second) + val result = compose(first, second) matValueStack.addLast(result) case PushAttributes(attr) ⇒ attributesStack.addLast(attributesStack.getLast and attr) diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index ff74cdbbc8..02175ae084 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -463,15 +463,15 @@ case class PhasedFusingActorMaterializer( case PushNotUsed ⇒ matValueStack.addLast(NotUsed) if (Debug) println(s"PUSH: NotUsed => $matValueStack") - case Transform(f) ⇒ + case transform: Transform ⇒ val prev = matValueStack.removeLast() - val result = f(prev) + val result = transform(prev) matValueStack.addLast(result) if (Debug) println(s"TRFM: $matValueStack") - case Compose(f) ⇒ + case compose: ComposeOp ⇒ val second = matValueStack.removeLast() val first = matValueStack.removeLast() - val result = f(first, second) + val result = compose(first, second) matValueStack.addLast(result) if (Debug) println(s"COMP: $matValueStack") case PushAttributes(attr) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 74da492efb..a4a78adc93 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -6,6 +6,7 @@ package akka.stream.impl import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule +import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 } import akka.stream.scaladsl.Keep /** @@ -114,8 +115,19 @@ sealed trait MaterializedValueOp extends Traversal case object Pop extends MaterializedValueOp case object PushNotUsed extends MaterializedValueOp -final case class Transform(mapper: Any ⇒ Any) extends MaterializedValueOp -final case class Compose(composer: (Any, Any) ⇒ Any) extends MaterializedValueOp +final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp { + def apply(arg: Any): Any = mapper.asInstanceOf[Any ⇒ Any](arg) +} +trait ComposeOp extends MaterializedValueOp { + def apply(arg1: Any, arg2: Any): Any +} +final case class Compose(composer: AnyFunction2) extends ComposeOp { + def apply(arg1: Any, arg2: Any): Any = composer.asInstanceOf[(Any, Any) ⇒ Any](arg1, arg2) +} +/** An optimization which applies the arguments in reverse order */ +final case class ComposeReversed(composer: AnyFunction2) extends ComposeOp { + def apply(arg1: Any, arg2: Any): Any = composer.asInstanceOf[(Any, Any) ⇒ Any](arg2, arg1) +} final case class PushAttributes(attributes: Attributes) extends Traversal final case object PopAttributes extends Traversal @@ -124,6 +136,10 @@ final case class EnterIsland(islandTag: IslandTag) extends Traversal final case object ExitIsland extends Traversal object TraversalBuilder { + // The most generic function1 and function2 (also completely useless, as we have thrown away all types) + // needs to be casted once to be useful (pending runtime exception in cases of bugs). + type AnyFunction1 = Nothing ⇒ Any + type AnyFunction2 = (Nothing, Nothing) ⇒ Any private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none) @@ -192,6 +208,7 @@ object TraversalBuilder { case Pop ⇒ prindent("pop mat") case _: Transform ⇒ prindent("transform mat") case _: Compose ⇒ prindent("compose mat") + case _: ComposeReversed ⇒ prindent("compose reversed mat") case PushAttributes(attr) ⇒ prindent("push attr " + attr) case PopAttributes ⇒ prindent("pop attr") case EnterIsland(tag) ⇒ prindent("enter island " + tag) @@ -256,13 +273,13 @@ sealed trait TraversalBuilder { * * See append in the [[LinearTraversalBuilder]] for a more efficient alternative for linear graphs. */ - def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder + def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder /** * Maps the materialized value produced by the module built-up so far with the provided function, providing a new * TraversalBuilder returning the mapped materialized value. */ - def transformMat[A, B](f: A ⇒ B): TraversalBuilder + def transformMat(f: AnyFunction1): TraversalBuilder protected def internalSetAttributes(attributes: Attributes): TraversalBuilder @@ -353,7 +370,7 @@ final case class CompletedTraversalBuilder( attributes: Attributes, islandTag: Option[IslandTag] = None) extends TraversalBuilder { - override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { + override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = { val key = new BuilderKey CompositeTraversalBuilder( reverseBuildSteps = key :: Nil, @@ -373,8 +390,8 @@ final case class CompletedTraversalBuilder( else PushAttributes(attributes).concat(withIsland).concat(PopAttributes) } - override def transformMat[A, B](f: (A) ⇒ B): TraversalBuilder = - copy(traversalSoFar = traversalSoFar.concat(Transform(f.asInstanceOf[Any ⇒ Any]))) + override def transformMat(f: AnyFunction1): TraversalBuilder = + copy(traversalSoFar = traversalSoFar.concat(Transform(f))) override def offsetOf(in: InPort): Int = inToOffset(in) @@ -414,7 +431,7 @@ final case class AtomicTraversalBuilder( unwiredOuts: Int, attributes: Attributes) extends TraversalBuilder { - override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { + override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = { // TODO: Use automatically a linear builder if applicable // Create a composite, add ourselves, then the other. CompositeTraversalBuilder(attributes = attributes) @@ -422,7 +439,7 @@ final case class AtomicTraversalBuilder( .add(submodule, shape, combineMat) } - override def transformMat[A, B](f: (A) ⇒ B): TraversalBuilder = + override def transformMat(f: AnyFunction1): TraversalBuilder = TraversalBuilder.empty().add(this, module.shape, Keep.right).transformMat(f) override val inSlots: Int = module.shape.inlets.size @@ -497,19 +514,19 @@ object LinearTraversalBuilder { attributes) } - def addMatCompose[A, B](t: Traversal, matCompose: (A, B) ⇒ Any): Traversal = { + def addMatCompose(t: Traversal, matCompose: AnyFunction2): Traversal = { if (matCompose eq Keep.left) Pop.concat(t) else if (matCompose eq Keep.right) t.concat(Pop) - else // TODO: Optimize this case so the extra function allocation is not needed. Maybe ReverseCompose? - t.concat(Compose((second, first) ⇒ matCompose.asInstanceOf[(Any, Any) ⇒ Any](first, second))) + else + t.concat(ComposeReversed(matCompose)) } - def fromBuilder[A, B]( + def fromBuilder( traversalBuilder: TraversalBuilder, shape: Shape, - combine: (A, B) ⇒ Any = Keep.right[A, B]): LinearTraversalBuilder = { + combine: AnyFunction2 = Keep.right): LinearTraversalBuilder = { traversalBuilder match { case linear: LinearTraversalBuilder ⇒ if (combine eq Keep.right) linear @@ -574,7 +591,7 @@ final case class LinearTraversalBuilder( protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty - override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { + override def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = { throw new UnsupportedOperationException("LinearTraversal does not support free-form addition. Add it into a" + "composite builder instead and add the second module to that.") } @@ -688,7 +705,7 @@ final case class LinearTraversalBuilder( override def unwiredOuts: Int = if (outPort.isDefined) 1 else 0 - def append[A, B, C](toAppend: TraversalBuilder, shape: Shape, matCompose: (A, B) ⇒ C): LinearTraversalBuilder = + def append(toAppend: TraversalBuilder, shape: Shape, matCompose: AnyFunction2): LinearTraversalBuilder = append(LinearTraversalBuilder.fromBuilder(toAppend, shape, Keep.right), matCompose) // We don't really need the Shape for the linear append, but it is nicer to keep the API uniform here @@ -696,7 +713,7 @@ final case class LinearTraversalBuilder( * Append any builder that is linear shaped (have at most one input and at most one output port) to the * end of this graph, connecting the output of the last module to the input of the appended module. */ - def append[A, B, C](toAppend: LinearTraversalBuilder, matCompose: (A, B) ⇒ C): LinearTraversalBuilder = { + def append(toAppend: LinearTraversalBuilder, matCompose: AnyFunction2): LinearTraversalBuilder = { if (toAppend.isEmpty) { copy( @@ -917,8 +934,8 @@ final case class LinearTraversalBuilder( } - override def transformMat[A, B](f: (A) ⇒ B): LinearTraversalBuilder = { - copy(traversalSoFar = traversalSoFar.concat(Transform(f.asInstanceOf[Any ⇒ Any]))) + override def transformMat(f: AnyFunction1): LinearTraversalBuilder = { + copy(traversalSoFar = traversalSoFar.concat(Transform(f))) } /** @@ -1066,7 +1083,7 @@ final case class CompositeTraversalBuilder( } // Requires that a remapped Shape's ports contain the same ID as their target ports! - def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { + def add(submodule: TraversalBuilder, shape: Shape, combineMat: AnyFunction2): TraversalBuilder = { val builderKey = new BuilderKey val newBuildSteps = @@ -1085,7 +1102,7 @@ final case class CompositeTraversalBuilder( builderKey :: reverseBuildSteps } else { - AppendTraversal(Compose(combineMat.asInstanceOf[(Any, Any) ⇒ Any])) :: + AppendTraversal(Compose(combineMat)) :: builderKey :: reverseBuildSteps } @@ -1152,8 +1169,8 @@ final case class CompositeTraversalBuilder( copy(inOffsets = inOffsets - in).assign(out, offsetOf(in) - offsetOfModule(out)) } - override def transformMat[A, B](f: (A) ⇒ B): TraversalBuilder = { - copy(finalSteps = finalSteps.concat(Transform(f.asInstanceOf[Any ⇒ Any]))) + override def transformMat(f: AnyFunction1): TraversalBuilder = { + copy(finalSteps = finalSteps.concat(Transform(f))) } override def makeIsland(islandTag: IslandTag): TraversalBuilder = { From c6ab269b994cc385a8c4a886b227f4f2e1e1ebee Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 9 Mar 2017 16:08:12 +0100 Subject: [PATCH 2/2] =str optimize Keep.none --- .../src/main/scala/akka/stream/impl/TraversalBuilder.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index a4a78adc93..bb69f09e49 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -519,6 +519,8 @@ object LinearTraversalBuilder { Pop.concat(t) else if (matCompose eq Keep.right) t.concat(Pop) + else if (matCompose eq Keep.none) + t.concat(Pop).concat(Pop).concat(PushNotUsed) else t.concat(ComposeReversed(matCompose)) }