From 86619ca32534691eb19299dea8f70beaa4d97f92 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 7 Feb 2017 15:50:15 +0100 Subject: [PATCH] use index access instad of iterator in Broadcast, #22113 * and a few more places * to avoid object allocation --- .../scala/akka/stream/scaladsl/Graph.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 313b25a7f4..a596e2b174 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -416,15 +416,13 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends pendingCount = downstreamsRunning val elem = grab(in) + val size = out.size var idx = 0 - val itr = out.iterator - - while (itr.hasNext) { - val o = itr.next() - val i = idx + while (idx < size) { + val o = out(idx) if (!isClosed(o)) { push(o, elem) - pending(i) = true + pending(idx) = true } idx += 1 } @@ -436,12 +434,12 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends if (pendingCount == 0 && !hasBeenPulled(in)) pull(in) { + val size = out.size var idx = 0 - val itr = out.iterator - while (itr.hasNext) { - val out = itr.next() - val i = idx - setHandler(out, new OutHandler { + while (idx < size) { + val o = out(idx) + val i = idx // close over val + setHandler(o, new OutHandler { override def onPull(): Unit = { pending(i) = false pendingCount -= 1 @@ -881,10 +879,10 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[ { var idxx = 0 - val itr = in.iterator - while (itr.hasNext) { - val i = itr.next() - val idx = idxx + val size = in.size + while (idxx < size) { + val i = in(idxx) + val idx = idxx // close over val setHandler(i, new InHandler { override def onPush() = { push(out, grab(i))