diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/simplify-forkjoin.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/simplify-forkjoin.excludes new file mode 100644 index 0000000000..b01dec911c --- /dev/null +++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/simplify-forkjoin.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# No longer need special code to support forkjoin dispatcher on JDK 8 +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ForkJoinExecutorConfigurator#PekkoForkJoinPool.this") diff --git a/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala b/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala deleted file mode 100644 index 8e089d16a3..0000000000 --- a/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.pekko.dispatch - -import org.apache.pekko -import pekko.annotation.InternalApi -import pekko.dispatch.ForkJoinExecutorConfigurator.PekkoForkJoinTask - -import java.util.concurrent.{ ForkJoinPool, ForkJoinTask, TimeUnit } - -/** - * INTERNAL PEKKO USAGE ONLY - * - * An alternative version of [[ForkJoinExecutorConfigurator.PekkoForkJoinPool]] - * that supports the `maximumPoolSize` feature available in [[java.util.concurrent.ForkJoinPool]] in JDK9+. - */ -@InternalApi -private[dispatch] final class PekkoJdk9ForkJoinPool( - parallelism: Int, - threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, - maximumPoolSize: Int, - unhandledExceptionHandler: Thread.UncaughtExceptionHandler, - asyncMode: Boolean) - extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode, - 0, maximumPoolSize, 1, null, ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS) - with LoadMetrics { - - override def execute(r: Runnable): Unit = - if (r ne null) - super.execute( - (if (r.isInstanceOf[ForkJoinTask[_]]) r else new PekkoForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]]) - else - throw new NullPointerException("Runnable was null") - - def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() -} diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index e56b67bc4b..36d9820ae4 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -18,9 +18,7 @@ import org.apache.pekko import pekko.dispatch.VirtualThreadSupport.newVirtualThreadFactory import pekko.util.JavaVersion -import java.lang.invoke.{ MethodHandle, MethodHandles, MethodType } -import java.util.concurrent.{ Executor, ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory } -import scala.util.Try +import java.util.concurrent.{ Executor, ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory, TimeUnit } object ForkJoinExecutorConfigurator { @@ -30,15 +28,12 @@ object ForkJoinExecutorConfigurator { final class PekkoForkJoinPool( parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + maximumPoolSize: Int, unhandledExceptionHandler: Thread.UncaughtExceptionHandler, asyncMode: Boolean) - extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode, + 0, maximumPoolSize, 1, null, ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS) with LoadMetrics { - def this( - parallelism: Int, - threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, - unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = - this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true) override def execute(r: Runnable): Unit = if (r ne null) @@ -110,23 +105,6 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer asyncMode: Boolean, maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false) - private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] = - Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption - - private lazy val pekkoJdk9ForkJoinPoolHandleOpt: Option[MethodHandle] = { - if (JavaVersion.majorVersion == 8) { - None - } else { - pekkoJdk9ForkJoinPoolClassOpt.map { clz => - val methodHandleLookup = MethodHandles.lookup() - val mt = MethodType.methodType(classOf[Unit], classOf[Int], - classOf[ForkJoinPool.ForkJoinWorkerThreadFactory], - classOf[Int], classOf[Thread.UncaughtExceptionHandler], classOf[Boolean]) - methodHandleLookup.findConstructor(clz, mt) - } - } - } - def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true) @@ -139,14 +117,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer } } else threadFactory - val pool = pekkoJdk9ForkJoinPoolHandleOpt match { - case Some(handle) => - // carrier Thread only exists in JDK 17+ - handle.invoke(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode) - .asInstanceOf[ExecutorService with LoadMetrics] - case _ => - new PekkoForkJoinPool(parallelism, tf, MonitorableThreadFactory.doNothing, asyncMode) - } + val pool = new PekkoForkJoinPool(parallelism, tf, maxPoolSize, MonitorableThreadFactory.doNothing, asyncMode) if (virtualize && JavaVersion.majorVersion >= 21) { // when virtualized, we need enhanced thread factory diff --git a/build.sbt b/build.sbt index 8826338e61..a9b818ddc1 100644 --- a/build.sbt +++ b/build.sbt @@ -121,7 +121,7 @@ lazy val actor = pekkoModule("actor") .settings(AddMetaInfLicenseFiles.actorSettings) .settings(VersionGenerator.settings) .settings(serialversionRemoverPluginSettings) - .enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9) + .enablePlugins(BoilerplatePlugin, SbtOsgi) lazy val actorTests = pekkoModule("actor-tests") .configs(Jdk9.TestJdk9)