From 3cfe37f016ae2397c375fcfd267323f9ef85fa55 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Thu, 25 Sep 2025 08:50:27 +0800 Subject: [PATCH] chore: Use array list for better performance in BroadcastHub (#2262) * chore: Use array list for better performance in BroadcastHub * chore: add benchmark --- .../pekko/stream/BroadcastHubBenchmark.scala | 94 +++++++++++++++++++ .../apache/pekko/stream/scaladsl/Hub.scala | 29 +++--- 2 files changed, 109 insertions(+), 14 deletions(-) create mode 100644 bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala new file mode 100644 index 0000000000..ede811c798 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import com.typesafe.config.ConfigFactory +import org.apache.pekko.NotUsed +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.remote.artery.{ BenchTestSource, LatchSink } +import org.apache.pekko.stream.scaladsl._ +import org.apache.pekko.stream.testkit.scaladsl.StreamTestKit +import org.openjdk.jmh.annotations._ + +import java.util.concurrent.{ CountDownLatch, TimeUnit } +import scala.concurrent.Await +import scala.concurrent.duration._ + +object BroadcastHubBenchmark { + final val OperationsPerInvocation = 100000 +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class BroadcastHubBenchmark { + import BroadcastHubBenchmark._ + + val config = ConfigFactory.parseString(""" + pekko.actor.default-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-factor = 1 + } + } + """) + + implicit val system: ActorSystem = ActorSystem("BroadcastHubBenchmark", config) + import system.dispatcher + + var testSource: Source[java.lang.Integer, NotUsed] = _ + + @Param(Array("64", "256")) + var parallelism = 0 + + @Setup + def setup(): Unit = { + // eager init of materializer + SystemMaterializer(system).materializer + testSource = Source.fromGraph(new BenchTestSource(OperationsPerInvocation)) + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def broadcast(): Unit = { + val latch = new CountDownLatch(parallelism) + val broadcastSink = + BroadcastHub.sink[java.lang.Integer](bufferSize = parallelism, startAfterNrOfConsumers = parallelism) + val sink = new LatchSink(OperationsPerInvocation, latch) + val source = testSource.runWith(broadcastSink) + var idx = 0 + while (idx < parallelism) { + source.runWith(sink) + idx += 1 + } + awaitLatch(latch) + } + + private def awaitLatch(latch: CountDownLatch): Unit = { + if (!latch.await(30, TimeUnit.SECONDS)) { + StreamTestKit.printDebugDump(SystemMaterializer(system).materializer.supervisor) + throw new RuntimeException("Latch didn't complete in time") + } + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index 563b630e98..7a8adf7c53 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -545,7 +545,8 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I * a wakeup and update their position at the same time. * */ - private[this] val consumerWheel = Array.fill[List[Consumer]](bufferSize * 2)(Nil) + private[this] val consumerWheel = + Array.fill[java.util.ArrayList[Consumer]](bufferSize * 2)(new util.ArrayList[Consumer]()) private[this] var activeConsumers = 0 override def preStart(): Unit = { @@ -653,8 +654,10 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I } // Notify registered consumers - consumerWheel.iterator.flatMap(_.iterator).foreach { consumer => - consumer.callback.invoke(failMessage) + var idx = 0 + while (idx < consumerWheel.length) { + consumerWheel(idx).forEach(_.callback.invoke(failMessage)) + idx += 1 } failStage(ex) } @@ -668,18 +671,16 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = { // TODO: Try to eliminate modulo division somehow... val wheelSlot = offset & WheelMask - var consumersInSlot = consumerWheel(wheelSlot) - // debug(s"consumers before removal $consumersInSlot") - var remainingConsumersInSlot: List[Consumer] = Nil + val consumersInSlot = consumerWheel(wheelSlot) var removedConsumer: Consumer = null - - while (consumersInSlot.nonEmpty) { - val consumer = consumersInSlot.head - if (consumer.id != id) remainingConsumersInSlot = consumer :: remainingConsumersInSlot - else removedConsumer = consumer - consumersInSlot = consumersInSlot.tail + if (consumersInSlot.size() > 0) { + consumersInSlot.removeIf(consumer => { + if (consumer.id == id) { + removedConsumer = consumer + true + } else false + }) } - consumerWheel(wheelSlot) = remainingConsumersInSlot removedConsumer } @@ -710,7 +711,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I private def addConsumer(consumer: Consumer, offset: Int): Unit = { val slot = offset & WheelMask - consumerWheel(slot) = consumer :: consumerWheel(slot) + consumerWheel(slot).add(consumer) } /*