Merge pull request #22268 from akka/wip-22113-iter-patriknw

use index access instad of iterator in Broadcast, #22113
This commit is contained in:
Patrik Nordwall 2017-02-21 14:45:43 +01:00 committed by GitHub
commit bf775298f6

View file

@ -416,15 +416,13 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends
pendingCount = downstreamsRunning
val elem = grab(in)
val size = out.size
var idx = 0
val itr = out.iterator
while (itr.hasNext) {
val o = itr.next()
val i = idx
while (idx < size) {
val o = out(idx)
if (!isClosed(o)) {
push(o, elem)
pending(i) = true
pending(idx) = true
}
idx += 1
}
@ -436,12 +434,12 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends
if (pendingCount == 0 && !hasBeenPulled(in)) pull(in)
{
val size = out.size
var idx = 0
val itr = out.iterator
while (itr.hasNext) {
val out = itr.next()
val i = idx
setHandler(out, new OutHandler {
while (idx < size) {
val o = out(idx)
val i = idx // close over val
setHandler(o, new OutHandler {
override def onPull(): Unit = {
pending(i) = false
pendingCount -= 1
@ -881,10 +879,10 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
{
var idxx = 0
val itr = in.iterator
while (itr.hasNext) {
val i = itr.next()
val idx = idxx
val size = in.size
while (idxx < size) {
val i = in(idxx)
val idx = idxx // close over val
setHandler(i, new InHandler {
override def onPush() = {
push(out, grab(i))