From 9f84be6d3e3d586e553dc25455086ed548fb60ca Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 9 Mar 2017 20:44:59 +0100 Subject: [PATCH] optimize ins and outs of size <= 1 * avoid iterator allocation --- .../main/scala/akka/stream/Attributes.scala | 4 +- .../src/main/scala/akka/stream/Shape.scala | 17 ++++++ .../impl/PhasedFusingActorMaterializer.scala | 53 ++++++++++++++----- .../akka/stream/impl/TraversalBuilder.scala | 50 +++++++++++------ .../scala/akka/stream/scaladsl/Graph.scala | 23 ++++++-- 5 files changed, 111 insertions(+), 36 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 325fb1f449..7c0cc602a1 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -171,7 +171,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { * Extracts Name attributes and concatenates them. */ def nameLifted: Option[String] = { - @tailrec def concatNames(i: Iterator[Attribute], first: String, buf: StringBuilder): String = + @tailrec def concatNames(i: Iterator[Attribute], first: String, buf: java.lang.StringBuilder): String = if (i.hasNext) i.next() match { case Name(n) ⇒ @@ -179,7 +179,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { val nn = URLEncoder.encode(n, "UTF-8") if (buf ne null) concatNames(i, null, buf.append('-').append(nn)) else if (first ne null) { - val b = new StringBuilder((first.length() + nn.length()) * 2) + val b = new java.lang.StringBuilder((first.length() + nn.length()) * 2) concatNames(i, null, b.append(first).append('-').append(nn)) } else concatNames(i, nn, null) case _ ⇒ concatNames(i, first, buf) diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index 378b3087ed..7a5d96acff 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -7,6 +7,7 @@ import akka.util.Collections.EmptyImmutableSeq import scala.collection.immutable import scala.collection.JavaConverters._ import scala.annotation.unchecked.uncheckedVariance +import akka.annotation.InternalApi /** * An input port of a StreamLayout.Module. This type logically belongs @@ -138,6 +139,22 @@ final class Outlet[T] private (val s: String) extends OutPort { else s" mapped to $mappedTo") } +/** + * INTERNAL API + */ +@InternalApi private[akka] object Shape { + /** + * `inlets` and `outlets` can be `Vector` or `List` so this method + * checks the size of 1 in an optimized way. + */ + def hasOnePort(ports: immutable.Seq[_]): Boolean = { + ports.nonEmpty && (ports match { + case l: List[_] ⇒ l.tail.isEmpty // assuming List is most common + case _ ⇒ ports.size == 1 // e.g. Vector + }) + } +} + /** * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the * philosophy that a Graph is a freely reusable blueprint, everything that diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 8ca98533da..63a260879a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -450,21 +450,10 @@ case class PhasedFusingActorMaterializer( if (Debug) println(s" materialized value is $matValue") matValueStack.addLast(matValue) - val ins = mod.shape.inlets.iterator val stageGlobalOffset = islandTracking.getCurrentOffset - while (ins.hasNext) { - val in = ins.next() - islandTracking.wireIn(in, logic) - } - - val outs = mod.shape.outlets.iterator - while (outs.hasNext) { - val out = outs.next() - val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id) - if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}") - islandTracking.wireOut(out, absoluteTargetSlot, logic) - } + wireInlets(islandTracking, mod, logic) + wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot) if (Debug) println(s"PUSH: $matValue => $matValueStack") @@ -511,6 +500,44 @@ case class PhasedFusingActorMaterializer( matValueStack.peekLast().asInstanceOf[Mat] } + private def wireInlets(islandTracking: IslandTracking, mod: StreamLayout.AtomicModule[Shape, Any], logic: Any): Unit = { + val inlets = mod.shape.inlets + if (inlets.nonEmpty) { + if (Shape.hasOnePort(inlets)) { + // optimization, duplication to avoid iterator allocation + islandTracking.wireIn(inlets.head, logic) + } else { + val ins = inlets.iterator + while (ins.hasNext) { + val in = ins.next() + islandTracking.wireIn(in, logic) + } + } + } + } + + private def wireOutlets(islandTracking: IslandTracking, mod: StreamLayout.AtomicModule[Shape, Any], logic: Any, + stageGlobalOffset: Int, outToSlot: Array[Int]): Unit = { + val outlets = mod.shape.outlets + if (outlets.nonEmpty) { + if (Shape.hasOnePort(outlets)) { + // optimization, duplication to avoid iterator allocation + val out = outlets.head + val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id) + if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}") + islandTracking.wireOut(out, absoluteTargetSlot, logic) + } else { + val outs = outlets.iterator + while (outs.hasNext) { + val out = outs.next() + val absoluteTargetSlot = stageGlobalOffset + outToSlot(out.id) + if (Debug) println(s" wiring offset: ${outToSlot.mkString("[", ",", "]")}") + islandTracking.wireOut(out, absoluteTargetSlot, logic) + } + } + } + } + override def makeLogger(logSource: Class[_]): LoggingAdapter = Logging(system, logSource) diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index ad13b0ba51..9243a49c0d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -9,6 +9,7 @@ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.scaladsl.Keep import akka.util.OptionVal import scala.language.existentials +import scala.collection.immutable.Map.Map1 /** * Graphs to be materialized are defined by their traversal. There is no explicit graph information tracked, instead @@ -135,18 +136,32 @@ object TraversalBuilder { */ private[impl] def initShape(shape: Shape): Unit = { // Initialize port IDs - val inIter = shape.inlets.iterator - var i = 0 - while (inIter.hasNext) { - inIter.next.id = i - i += 1 + val inlets = shape.inlets + if (inlets.nonEmpty) { + if (Shape.hasOnePort(inlets)) + inlets.head.id = 0 + else { + val inIter = inlets.iterator + var i = 0 + while (inIter.hasNext) { + inIter.next().id = i + i += 1 + } + } } - val outIter = shape.outlets.iterator - i = 0 - while (outIter.hasNext) { - outIter.next.id = i - i += 1 + val outlets = shape.outlets + if (outlets.nonEmpty) { + if (Shape.hasOnePort(outlets)) + outlets.head.id = 0 + else { + val outIter = shape.outlets.iterator + var i = 0 + while (outIter.hasNext) { + outIter.next().id = i + i += 1 + } + } } } @@ -447,12 +462,17 @@ final case class AtomicTraversalBuilder( // Check if every output port has been assigned, if yes, we have a Traversal for this module. val newUnwiredOuts = unwiredOuts - 1 if (newUnwiredOuts == 0) { + val inToOffset: Map[InPort, Int] = { + val inlets = module.shape.inlets + if (inlets.isEmpty) Map.empty + else if (Shape.hasOnePort(inlets)) new Map1(inlets.head, inlets.head.id) + else inlets.map(in ⇒ in.asInstanceOf[InPort] → in.id)(collection.breakOut) + } CompletedTraversalBuilder( traversalSoFar = MaterializeAtomic(module, newOutToSlot), - inSlots = inSlots, - // TODO Optimize Map creation - inToOffset = module.shape.inlets.iterator.map(in ⇒ in → in.id).toMap, - attributes = attributes) + inSlots, + inToOffset, + attributes) } else copy(outToSlot = newOutToSlot, unwiredOuts = newUnwiredOuts) } @@ -638,8 +658,6 @@ final case class LinearTraversalBuilder( .assign(out, inOffset - composite.offsetOfModule(out)) .traversal).concat(traversalSoFar)), pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal) - case OptionVal.None ⇒ - copy(inPort = OptionVal.None, outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) case OptionVal.None ⇒ copy( inPort = OptionVal.None, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 11bf3b4eb1..52268d8684 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -88,7 +88,13 @@ final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends Gr private var runningUpstreams = inputPorts private def upstreamsClosed = runningUpstreams == 0 - override def preStart(): Unit = in.foreach(tryPull) + override def preStart(): Unit = { + var ix = 0 + while (ix < in.size) { + tryPull(in(ix)) + ix += 1 + } + } @tailrec private def dequeueAndDispatch(): Unit = { @@ -107,7 +113,11 @@ final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends Gr } } - in.foreach { i ⇒ + var ix = 0 + while (ix < in.size) { + val i = in(ix) + ix += 1 + setHandler(i, new InHandler { override def onPush(): Unit = { if (isAvailable(out)) { @@ -120,7 +130,11 @@ final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends Gr override def onUpstreamFinish() = if (eagerComplete) { - in.foreach(cancel) + var ix2 = 0 + while (ix2 < in.size) { + cancel(in(ix2)) + ix2 += 1 + } runningUpstreams = 0 if (!pending) completeStage() } else { @@ -1069,8 +1083,7 @@ object GraphDSL extends GraphApply { traversalBuilderInProgress = traversalBuilderInProgress.add( graph.traversalBuilder.transformMat(transform), newShape, - Keep.right - ) + Keep.right) unwiredIns ++= newShape.inlets unwiredOuts ++= newShape.outlets