From 4bcfc9c7e2f4ffdb3f7a2303c59e95dd73bfb09d Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 1 Aug 2025 09:24:05 +0100 Subject: [PATCH] Remove use of sun.misc.Unsafe in Mailbox (#1894) * remove use of Unsafe in Mailbox * scalafmt * use varhandles * remove cast * Update Mailbox.scala --- .sbt-java-formatter.conf | 1 - .../pekko/dispatch/AbstractMailbox.java | 41 +++++++++++++------ .../org/apache/pekko/dispatch/Mailbox.scala | 25 +++++------ 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/.sbt-java-formatter.conf b/.sbt-java-formatter.conf index eaf139d7c8..22dbb93843 100644 --- a/.sbt-java-formatter.conf +++ b/.sbt-java-formatter.conf @@ -15,7 +15,6 @@ ignored-files = [ //pekko-actor "OnSpinWait.java", "AbstractBoundedNodeQueue.java", - "AbstractMailbox.java", "AbstractMessageDispatcher.java", "AbstractNodeQueue.java", //pekko-cluster diff --git a/actor/src/main/java/org/apache/pekko/dispatch/AbstractMailbox.java b/actor/src/main/java/org/apache/pekko/dispatch/AbstractMailbox.java index cb08254e51..df836b8cee 100644 --- a/actor/src/main/java/org/apache/pekko/dispatch/AbstractMailbox.java +++ b/actor/src/main/java/org/apache/pekko/dispatch/AbstractMailbox.java @@ -1,28 +1,45 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pekko.dispatch; -import org.apache.pekko.util.Unsafe; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +import org.apache.pekko.dispatch.sysmsg.SystemMessage; final class AbstractMailbox { - final static long mailboxStatusOffset; - final static long systemMessageOffset; + final static VarHandle mailboxStatusHandle; + final static VarHandle systemMessageHandle; static { try { - mailboxStatusOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_statusDoNotCallMeDirectly")); - systemMessageOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_systemQueueDoNotCallMeDirectly")); + MethodHandles.Lookup lookup = + MethodHandles.privateLookupIn(Mailbox.class, MethodHandles.lookup()); + mailboxStatusHandle = + lookup.findVarHandle( + Mailbox.class, + "_statusDoNotCallMeDirectly", + int.class); + systemMessageHandle = + lookup.findVarHandle( + Mailbox.class, + "_systemQueueDoNotCallMeDirectly", + SystemMessage.class); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala index f8df5240a5..6eb61bed96 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala @@ -17,7 +17,7 @@ import java.util.{ Comparator, Deque, PriorityQueue, Queue } import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock -import scala.annotation.{ nowarn, tailrec } +import scala.annotation.tailrec import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.util.control.NonFatal import com.typesafe.config.Config @@ -26,7 +26,7 @@ import pekko.actor.{ ActorCell, ActorRef, ActorSystem, DeadLetter, InternalActor import pekko.annotation.InternalStableApi import pekko.dispatch.sysmsg._ import pekko.event.Logging.Error -import pekko.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe } +import pekko.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue } import pekko.util.Helpers.ConfigOps /** @@ -118,8 +118,7 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue) @volatile protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ // null by default - final def currentStatus: Mailbox.Status = - Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset): @nowarn("cat=deprecation") + final def currentStatus: Mailbox.Status = AbstractMailbox.mailboxStatusHandle.get(this) final def shouldProcessMessage: Boolean = (currentStatus & shouldNotProcessMask) == 0 @@ -132,11 +131,10 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue) final def isScheduled: Boolean = (currentStatus & Scheduled) != 0 protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = - Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus): @nowarn( - "cat=deprecation") + AbstractMailbox.mailboxStatusHandle.compareAndSet(this, oldStatus, newStatus) protected final def setStatus(newStatus: Status): Unit = - Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus): @nowarn("cat=deprecation") + AbstractMailbox.mailboxStatusHandle.set(this, newStatus) /** * Reduce the suspend count by one. Caller does not need to worry about whether @@ -208,17 +206,14 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue) protected final def systemQueueGet: LatestFirstSystemMessageList = // Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such // it just exists as a typed view during compile-time. The actual return type is still SystemMessage. - new LatestFirstSystemMessageList( - Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[ - SystemMessage]): @nowarn("cat=deprecation") + new LatestFirstSystemMessageList(AbstractMailbox.systemMessageHandle.get(this)) protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new: LatestFirstSystemMessageList): Boolean = (_old.head eq _new.head) || - // Note: calling .head is not actually existing on the bytecode level as the parameters _old and _new - // are SystemMessage instances hidden during compile time behind the SystemMessageList value class. - // Without calling .head the parameters would be boxed in SystemMessageList wrapper. - Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head): @nowarn( - "cat=deprecation") + // Note: calling .head is not actually existing on the bytecode level as the parameters _old and _new + // are SystemMessage instances hidden during compile time behind the SystemMessageList value class. + // Without calling .head the parameters would be boxed in SystemMessageList wrapper. + AbstractMailbox.systemMessageHandle.compareAndSet(this, _old.head, _new.head) final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = currentStatus match {