From ede151b52e355691230d45b724ffde35865db62e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 11 Sep 2015 17:02:04 +0200 Subject: [PATCH] =ben #15693 Sync benchmarks from release-2.3 branch --- .../scala/akka/actor/TellOnlyBenchmark.scala | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala diff --git a/akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala new file mode 100644 index 0000000000..c41ced8113 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/TellOnlyBenchmark.scala @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.actor + +import akka.dispatch._ +import akka.testkit.TestProbe +import akka.util.Helpers.ConfigOps +import com.typesafe.config.{ Config, ConfigFactory } +import java.util.concurrent.TimeUnit +import org.openjdk.jmh.annotations._ +import scala.concurrent.duration._ +import scala.concurrent.Await + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.SingleShotTime)) +@Fork(5) +@Threads(1) +@Warmup(iterations = 10, batchSize = TellOnlyBenchmark.numMessages) +@Measurement(iterations = 10, batchSize = TellOnlyBenchmark.numMessages) +class TellOnlyBenchmark { + import TellOnlyBenchmark._ + + implicit var system: ActorSystem = _ + + @Setup(Level.Trial) + def setup() { + system = ActorSystem("TellOnlyBenchmark", ConfigFactory.parseString( + s"""| akka { + | log-dead-letters = off + | actor { + | default-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 1 + | parallelism-max = 4 + | } + | throughput = 1 + | } + | } + | } + | dropping-dispatcher { + | fork-join-executor.parallelism-min = 1 + | fork-join-executor.parallelism-max = 1 + | type = "akka.actor.TellOnlyBenchmark$$DroppingDispatcherConfigurator" + | mailbox-type = "akka.actor.TellOnlyBenchmark$$UnboundedDroppingMailbox" + | } + | """.stripMargin)) + } + + @TearDown(Level.Trial) + def shutdown() { + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) + } + + var actor: ActorRef = _ + var probe: TestProbe = _ + + @Setup(Level.Iteration) + def setupIteration() { + actor = system.actorOf(Props[TellOnlyBenchmark.Echo].withDispatcher("dropping-dispatcher")) + probe = TestProbe() + probe.watch(actor) + probe.send(actor, message) + probe.expectMsg(message) + probe.send(actor, flipDrop) + probe.expectNoMsg(200.millis) + System.gc() + } + + @TearDown(Level.Iteration) + def shutdownIteration() { + probe.send(actor, flipDrop) + probe.expectNoMsg(200.millis) + actor ! stop + probe.expectTerminated(actor, timeout) + actor = null + probe = null + } + + @Benchmark + @OutputTimeUnit(TimeUnit.MICROSECONDS) + def tell() = { + probe.send(actor, message) + } +} + +object TellOnlyBenchmark { + final val stop = "stop" + final val message = "message" + final val flipDrop = "flipDrop" + final val timeout = 5.seconds + final val numMessages = 1000000 + + class Echo extends Actor { + def receive = { + case s @ `stop` ⇒ + context stop self + case m ⇒ sender ! m + } + } + + class DroppingMessageQueue extends UnboundedMailbox.MessageQueue { + @volatile var dropping = false + + override def enqueue(receiver: ActorRef, handle: Envelope) = { + if (handle.message == flipDrop) dropping = !dropping + else if (!dropping) super.enqueue(receiver, handle) + } + } + + case class UnboundedDroppingMailbox() extends MailboxType with ProducesMessageQueue[DroppingMessageQueue] { + + def this(settings: ActorSystem.Settings, config: Config) = this() + + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = + new DroppingMessageQueue + } + + class DroppingDispatcher( + _configurator: MessageDispatcherConfigurator, + _id: String, + _throughput: Int, + _throughputDeadlineTime: Duration, + _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, + _shutdownTimeout: FiniteDuration) + extends Dispatcher(_configurator, _id, _throughput, _throughputDeadlineTime, _executorServiceFactoryProvider, _shutdownTimeout) { + + override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { + val mbox = receiver.mailbox + mbox.enqueue(receiver.self, invocation) + mbox.messageQueue match { + case mb: DroppingMessageQueue if mb.dropping ⇒ // do nothing + case _ ⇒ registerForExecution(mbox, true, false) + } + } + } + + class DroppingDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends MessageDispatcherConfigurator(config, prerequisites) { + + override def dispatcher(): MessageDispatcher = new DroppingDispatcher( + this, + config.getString("id"), + config.getInt("throughput"), + config.getNanosDuration("throughput-deadline-time"), + configureExecutor(), + config.getMillisDuration("shutdown-timeout")) + } +}