From 96a84ca89ba81ca625cccca3f57449d432a6218b Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Wed, 13 Aug 2025 20:04:00 +0100 Subject: [PATCH] use better byte buffer cleaner (#2020) * use better byte buffer cleaner * scalafmt * Update ByteBufferCleaner.java * Update actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update DirectByteBufferPool.scala * method handle * Create ByteBufferCleanerSpec.scala * invokeExact * Update ByteBufferCleaner.java * test buffer cleaner at init time --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- LICENSE | 5 + .../pekko/io/ByteBufferCleanerSpec.scala | 34 ++++ .../apache/pekko/io/ByteBufferCleaner.java | 146 ++++++++++++++++++ .../pekko/io/DirectByteBufferPool.scala | 29 +--- 4 files changed, 193 insertions(+), 21 deletions(-) create mode 100644 actor-tests/src/test/scala/org/apache/pekko/io/ByteBufferCleanerSpec.scala create mode 100644 actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java diff --git a/LICENSE b/LICENSE index a6e6defab8..be35cdd3ab 100644 --- a/LICENSE +++ b/LICENSE @@ -317,6 +317,11 @@ Copyright 2015 Ben Manes. All Rights Reserved. --------------- +pekko-actor contains code in `org.apache.pekko.io.ByteBufferCleaner` which was based on code +from Apache commons-io which was developed under the Apache 2.0 license. + +--------------- + pekko-cluster contains VectorClock.scala which is derived from code written by Coda Hale . He has agreed to allow us to use this code under an Apache 2.0 license diff --git a/actor-tests/src/test/scala/org/apache/pekko/io/ByteBufferCleanerSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/io/ByteBufferCleanerSpec.scala new file mode 100644 index 0000000000..241dbe4cd4 --- /dev/null +++ b/actor-tests/src/test/scala/org/apache/pekko/io/ByteBufferCleanerSpec.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.io + +import java.nio.ByteBuffer + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class ByteBufferCleanerSpec extends AnyWordSpec with Matchers { + + "ByteBufferCleaner" should { + "be able to clean direct byte buffers" in { + val buffer = ByteBuffer.allocateDirect(1) + ByteBufferCleaner.isSupported shouldBe true + ByteBufferCleaner.clean(buffer) // This should not throw an exception + } + } +} diff --git a/actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java b/actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java new file mode 100644 index 0000000000..c49394640b --- /dev/null +++ b/actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.io; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import static java.lang.invoke.MethodType.methodType; + +/** + * Cleans a direct {@link ByteBuffer}. Without manual intervention, direct ByteBuffers will be + * cleaned eventually upon garbage collection. However, this should not be relied upon since it may + * not occur in a timely fashion - especially since off heap ByteBuffers don't put pressure on the + * garbage collector. + * + *

Warning: Do not attempt to use a direct {@link ByteBuffer} that has been + * cleaned or bad things will happen. Don't use this class unless you can ensure that the cleaned + * buffer will not be accessed anymore. + * + *

See JDK-4724038 + */ +final class ByteBufferCleaner { + + // adapted from + // https://github.com/apache/commons-io/blob/441115a4b5cd63ae808dd4c40fc238cb52c8048f/src/main/java/org/apache/commons/io/input/ByteBufferCleaner.java + + private interface Cleaner { + void clean(ByteBuffer buffer) throws Throwable; + } + + private static final class Java8Cleaner implements Cleaner { + + private final Method cleanerMethod; + private final Method cleanMethod; + + private Java8Cleaner() throws ReflectiveOperationException { + cleanMethod = Class.forName("sun.misc.Cleaner").getMethod("clean"); + cleanerMethod = Class.forName("sun.nio.ch.DirectBuffer").getMethod("cleaner"); + } + + @Override + public void clean(final ByteBuffer buffer) throws Throwable { + final Object cleaner = cleanerMethod.invoke(buffer); + if (cleaner != null) { + cleanMethod.invoke(cleaner); + } + } + } + + private static final class Java9Cleaner implements Cleaner { + + private final MethodHandle invokeCleaner; + + private Java9Cleaner() throws ReflectiveOperationException { + final Class unsafeClass = Class.forName("sun.misc.Unsafe"); + final Field field = unsafeClass.getDeclaredField("theUnsafe"); + field.setAccessible(true); + final Object theUnsafe = field.get(null); + MethodHandles.Lookup lookup = MethodHandles.lookup(); + MethodHandle invokeCleaner = + lookup.findVirtual( + unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class)); + this.invokeCleaner = invokeCleaner.bindTo(theUnsafe); + } + + @Override + public void clean(final ByteBuffer buffer) throws Throwable { + invokeCleaner.invokeExact(buffer); + } + } + + private static final Cleaner INSTANCE = getCleaner(); + + /** + * Releases memory held by the given {@link ByteBuffer}. + * + * @param buffer to release. + * @throws IllegalStateException on internal failure. + */ + static void clean(final ByteBuffer buffer) { + try { + INSTANCE.clean(buffer); + } catch (final Throwable t) { + throw new IllegalStateException("Failed to clean direct buffer.", t); + } + } + + private static Cleaner getCleaner() { + Cleaner cleaner = null; + try { + cleaner = new Java8Cleaner(); + } catch (final Exception e) { + try { + cleaner = new Java9Cleaner(); + } catch (final Exception e1) { + System.err.println( + "WARNING: Failed to initialize a ByteBuffer Cleaner. This means " + + "direct ByteBuffers will only be cleaned upon garbage collection. Reason: " + + e1); + } + } + if (cleaner != null) { + try { + ByteBuffer testByteBuffer = ByteBuffer.allocateDirect(1); + cleaner.clean(testByteBuffer); + } catch (final Throwable t) { + cleaner = null; + System.err.println( + "WARNING: ByteBuffer Cleaner failed to clean a test buffer. ByteBuffer Cleaner " + + "has been disabled. This means direct ByteBuffers will only be cleaned upon garbage collection. " + + "Reason: " + + t); + } + } + return cleaner; + } + + /** + * Tests if were able to load a suitable cleaner for the current JVM. Attempting to call {@code + * ByteBufferCleaner#clean(ByteBuffer)} when this method returns false will result in an + * exception. + * + * @return {@code true} if cleaning is supported, {@code false} otherwise. + */ + static boolean isSupported() { + return INSTANCE != null; + } +} diff --git a/actor/src/main/scala/org/apache/pekko/io/DirectByteBufferPool.scala b/actor/src/main/scala/org/apache/pekko/io/DirectByteBufferPool.scala index f19a583c30..50595a4d0a 100644 --- a/actor/src/main/scala/org/apache/pekko/io/DirectByteBufferPool.scala +++ b/actor/src/main/scala/org/apache/pekko/io/DirectByteBufferPool.scala @@ -87,24 +87,6 @@ private[pekko] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries /** INTERNAL API */ private[pekko] object DirectByteBufferPool { - private val CleanDirectBuffer: ByteBuffer => Unit = - try { - val cleanerMethod = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner") - cleanerMethod.setAccessible(true) - - val cleanMethod = Class.forName("sun.misc.Cleaner").getMethod("clean") - cleanMethod.setAccessible(true) - - { (bb: ByteBuffer) => - try if (bb.isDirect) { - val cleaner = cleanerMethod.invoke(bb) - cleanMethod.invoke(cleaner) - } - catch { - case NonFatal(_) => /* ok, best effort attempt to cleanup failed */ - } - } - } catch { case NonFatal(_) => _ => () /* reflection failed, use no-op fallback */ } /** * DirectByteBuffers are garbage collected by using a phantom reference and a @@ -113,8 +95,13 @@ private[pekko] object DirectByteBufferPool { * immediately after discarding all references to a DirectByteBuffer, it's * easy to OutOfMemoryError yourself using DirectByteBuffers. This function * explicitly calls the Cleaner method of a DirectByteBuffer. - * - * Utilizes reflection to avoid dependency to `sun.misc.Cleaner`. */ - def tryCleanDirectByteBuffer(byteBuffer: ByteBuffer): Unit = CleanDirectBuffer(byteBuffer) + def tryCleanDirectByteBuffer(byteBuffer: ByteBuffer): Unit = { + try { + if (byteBuffer.isDirect() && ByteBufferCleaner.isSupported) + ByteBufferCleaner.clean(byteBuffer) + } catch { + case NonFatal(_) => /* ok, best effort attempt to cleanup failed */ + } + } }