use better byte buffer cleaner (#2020)

* use better byte buffer cleaner

* scalafmt

* Update ByteBufferCleaner.java

* Update actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update DirectByteBufferPool.scala

* method handle

* Create ByteBufferCleanerSpec.scala

* invokeExact

* Update ByteBufferCleaner.java

* test buffer cleaner at init time

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
PJ Fanning 2025-08-13 20:04:00 +01:00 committed by GitHub
parent e7e71a3adf
commit 96a84ca89b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 193 additions and 21 deletions

View file

@ -317,6 +317,11 @@ Copyright 2015 Ben Manes. All Rights Reserved.
--------------- ---------------
pekko-actor contains code in `org.apache.pekko.io.ByteBufferCleaner` which was based on code
from Apache commons-io which was developed under the Apache 2.0 license.
---------------
pekko-cluster contains VectorClock.scala which is derived from code written pekko-cluster contains VectorClock.scala which is derived from code written
by Coda Hale <https://github.com/codahale/vlock>. by Coda Hale <https://github.com/codahale/vlock>.
He has agreed to allow us to use this code under an Apache 2.0 license He has agreed to allow us to use this code under an Apache 2.0 license

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.io
import java.nio.ByteBuffer
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
class ByteBufferCleanerSpec extends AnyWordSpec with Matchers {
"ByteBufferCleaner" should {
"be able to clean direct byte buffers" in {
val buffer = ByteBuffer.allocateDirect(1)
ByteBufferCleaner.isSupported shouldBe true
ByteBufferCleaner.clean(buffer) // This should not throw an exception
}
}
}

View file

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.io;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import static java.lang.invoke.MethodType.methodType;
/**
* Cleans a direct {@link ByteBuffer}. Without manual intervention, direct ByteBuffers will be
* cleaned eventually upon garbage collection. However, this should not be relied upon since it may
* not occur in a timely fashion - especially since off heap ByteBuffers don't put pressure on the
* garbage collector.
*
* <p><strong>Warning:</strong> Do not attempt to use a direct {@link ByteBuffer} that has been
* cleaned or bad things will happen. Don't use this class unless you can ensure that the cleaned
* buffer will not be accessed anymore.
*
* <p>See <a href=https://bugs.openjdk.java.net/browse/JDK-4724038>JDK-4724038</a>
*/
final class ByteBufferCleaner {
// adapted from
// https://github.com/apache/commons-io/blob/441115a4b5cd63ae808dd4c40fc238cb52c8048f/src/main/java/org/apache/commons/io/input/ByteBufferCleaner.java
private interface Cleaner {
void clean(ByteBuffer buffer) throws Throwable;
}
private static final class Java8Cleaner implements Cleaner {
private final Method cleanerMethod;
private final Method cleanMethod;
private Java8Cleaner() throws ReflectiveOperationException {
cleanMethod = Class.forName("sun.misc.Cleaner").getMethod("clean");
cleanerMethod = Class.forName("sun.nio.ch.DirectBuffer").getMethod("cleaner");
}
@Override
public void clean(final ByteBuffer buffer) throws Throwable {
final Object cleaner = cleanerMethod.invoke(buffer);
if (cleaner != null) {
cleanMethod.invoke(cleaner);
}
}
}
private static final class Java9Cleaner implements Cleaner {
private final MethodHandle invokeCleaner;
private Java9Cleaner() throws ReflectiveOperationException {
final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
final Field field = unsafeClass.getDeclaredField("theUnsafe");
field.setAccessible(true);
final Object theUnsafe = field.get(null);
MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle invokeCleaner =
lookup.findVirtual(
unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class));
this.invokeCleaner = invokeCleaner.bindTo(theUnsafe);
}
@Override
public void clean(final ByteBuffer buffer) throws Throwable {
invokeCleaner.invokeExact(buffer);
}
}
private static final Cleaner INSTANCE = getCleaner();
/**
* Releases memory held by the given {@link ByteBuffer}.
*
* @param buffer to release.
* @throws IllegalStateException on internal failure.
*/
static void clean(final ByteBuffer buffer) {
try {
INSTANCE.clean(buffer);
} catch (final Throwable t) {
throw new IllegalStateException("Failed to clean direct buffer.", t);
}
}
private static Cleaner getCleaner() {
Cleaner cleaner = null;
try {
cleaner = new Java8Cleaner();
} catch (final Exception e) {
try {
cleaner = new Java9Cleaner();
} catch (final Exception e1) {
System.err.println(
"WARNING: Failed to initialize a ByteBuffer Cleaner. This means "
+ "direct ByteBuffers will only be cleaned upon garbage collection. Reason: "
+ e1);
}
}
if (cleaner != null) {
try {
ByteBuffer testByteBuffer = ByteBuffer.allocateDirect(1);
cleaner.clean(testByteBuffer);
} catch (final Throwable t) {
cleaner = null;
System.err.println(
"WARNING: ByteBuffer Cleaner failed to clean a test buffer. ByteBuffer Cleaner "
+ "has been disabled. This means direct ByteBuffers will only be cleaned upon garbage collection. "
+ "Reason: "
+ t);
}
}
return cleaner;
}
/**
* Tests if were able to load a suitable cleaner for the current JVM. Attempting to call {@code
* ByteBufferCleaner#clean(ByteBuffer)} when this method returns false will result in an
* exception.
*
* @return {@code true} if cleaning is supported, {@code false} otherwise.
*/
static boolean isSupported() {
return INSTANCE != null;
}
}

View file

@ -87,24 +87,6 @@ private[pekko] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries
/** INTERNAL API */ /** INTERNAL API */
private[pekko] object DirectByteBufferPool { private[pekko] object DirectByteBufferPool {
private val CleanDirectBuffer: ByteBuffer => Unit =
try {
val cleanerMethod = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner")
cleanerMethod.setAccessible(true)
val cleanMethod = Class.forName("sun.misc.Cleaner").getMethod("clean")
cleanMethod.setAccessible(true)
{ (bb: ByteBuffer) =>
try if (bb.isDirect) {
val cleaner = cleanerMethod.invoke(bb)
cleanMethod.invoke(cleaner)
}
catch {
case NonFatal(_) => /* ok, best effort attempt to cleanup failed */
}
}
} catch { case NonFatal(_) => _ => () /* reflection failed, use no-op fallback */ }
/** /**
* DirectByteBuffers are garbage collected by using a phantom reference and a * DirectByteBuffers are garbage collected by using a phantom reference and a
@ -113,8 +95,13 @@ private[pekko] object DirectByteBufferPool {
* immediately after discarding all references to a DirectByteBuffer, it's * immediately after discarding all references to a DirectByteBuffer, it's
* easy to OutOfMemoryError yourself using DirectByteBuffers. This function * easy to OutOfMemoryError yourself using DirectByteBuffers. This function
* explicitly calls the Cleaner method of a DirectByteBuffer. * explicitly calls the Cleaner method of a DirectByteBuffer.
*
* Utilizes reflection to avoid dependency to `sun.misc.Cleaner`.
*/ */
def tryCleanDirectByteBuffer(byteBuffer: ByteBuffer): Unit = CleanDirectBuffer(byteBuffer) def tryCleanDirectByteBuffer(byteBuffer: ByteBuffer): Unit = {
try {
if (byteBuffer.isDirect() && ByteBufferCleaner.isSupported)
ByteBufferCleaner.clean(byteBuffer)
} catch {
case NonFatal(_) => /* ok, best effort attempt to cleanup failed */
}
}
} }