From 9b97614b959a37dbffb3b4579b258d3e0d2e7055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 27 Jan 2022 16:14:29 +0100 Subject: [PATCH] JDK 17 CI nightly job (#31085) --- .github/workflows/nightly-builds.yml | 5 +- .../akka/actor/CoordinatedShutdownSpec.scala | 92 ++++++++++++------- .../non-jdk-intrusive-shutdown-tests.excludes | 2 + .../akka/actor/CoordinatedShutdown.scala | 54 ++++++++--- .../ReplicatedDataSerializerSpec.scala | 41 --------- project/AkkaBuild.scala | 3 +- project/JdkOptions.scala | 10 ++ project/MultiNode.scala | 2 +- 8 files changed, 116 insertions(+), 93 deletions(-) create mode 100644 akka-actor/src/main/mima-filters/2.6.18.backward.excludes/non-jdk-intrusive-shutdown-tests.excludes diff --git a/.github/workflows/nightly-builds.yml b/.github/workflows/nightly-builds.yml index ceef63d1fe..e2a830d630 100644 --- a/.github/workflows/nightly-builds.yml +++ b/.github/workflows/nightly-builds.yml @@ -3,6 +3,7 @@ name: Nightly Builds on: schedule: - cron: "0 0 * * *" + workflow_dispatch: jobs: @@ -120,12 +121,14 @@ jobs: # binary version is required and Akka build will set the right # full version from it. scalaVersion: ["2.12", "2.13"] - jdkVersion: ["adopt@1.8.0", "adopt@1.11"] + jdkVersion: ["adopt@1.8.0", "adopt@1.11", "openjdk@1.17.0"] include: - jdkVersion: adopt@1.8.0 extraOpts: "-Dmultinode.XX:+PrintGCDetails -Dmultinode.XX:+PrintGCTimeStamps" - jdkVersion: adopt@1.11 extraOpts: "-Dmultinode.Xlog:gc" + - jdkVersion: openjdk@1.17.0 + extraopts: "-Dmultinode.Xlog:gc" steps: - name: Checkout uses: actions/checkout@v2 diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index e7fb0aec13..1f7abc444c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -4,21 +4,25 @@ package akka.actor -import java.util -import java.util.concurrent.{ Executors, TimeoutException } - -import scala.concurrent.{ Await, ExecutionContext, Future, Promise } -import scala.concurrent.duration._ - -import com.typesafe.config.{ Config, ConfigFactory } - import akka.ConfigurationException import akka.Done import akka.actor.CoordinatedShutdown.Phase import akka.actor.CoordinatedShutdown.UnknownReason import akka.dispatch.ExecutionContexts -import akka.testkit.{ AkkaSpec, EventFilter, TestKit, TestProbe } -import akka.util.ccompat.JavaConverters._ +import akka.testkit.AkkaSpec +import akka.testkit.EventFilter +import akka.testkit.TestKit +import akka.testkit.TestProbe +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import java.util.concurrent.Executors +import java.util.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.Promise class CoordinatedShutdownSpec extends AkkaSpec(ConfigFactory.parseString(""" @@ -573,9 +577,9 @@ class CoordinatedShutdownSpec akka.coordinated-shutdown.run-by-actor-system-terminate = off """) - override def withSystemRunning(newSystem: ActorSystem): Unit = { + override def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = { val cancellable = - CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) + coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) myHooksCount should ===(1) // one user, none from system cancellable.cancel() } @@ -589,9 +593,9 @@ class CoordinatedShutdownSpec akka.coordinated-shutdown.run-by-actor-system-terminate = off """) - override def withSystemRunning(newSystem: ActorSystem): Unit = { + override def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = { val cancellable = - CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) + coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) myHooksCount should ===(2) // one user, one from system cancellable.cancel() @@ -605,9 +609,9 @@ class CoordinatedShutdownSpec akka.coordinated-shutdown.terminate-actor-system = on """) - def withSystemRunning(newSystem: ActorSystem): Unit = { + def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = { val cancellable = - CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) + coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) myHooksCount should ===(2) // one user, one from actor system cancellable.cancel() } @@ -620,9 +624,9 @@ class CoordinatedShutdownSpec akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on """) - def withSystemRunning(newSystem: ActorSystem): Unit = { + def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = { val cancellable = - CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) + coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}")) myHooksCount should ===(1) // one user, none from actor system cancellable.cancel() } @@ -635,7 +639,7 @@ class CoordinatedShutdownSpec akka.coordinated-shutdown.terminate-actor-system = on """) - def withSystemRunning(newSystem: ActorSystem): Unit = { + def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = { TestKit.shutdownActorSystem(newSystem) CoordinatedShutdown(newSystem) @@ -740,32 +744,50 @@ class CoordinatedShutdownSpec abstract class JvmHookTest { - private val initialHookCount = trixyTrixCountJvmHooks(systemName) - initialHookCount should ===(0) - def systemName: String def systemConfig: Config - def withSystemRunning(system: ActorSystem): Unit + def withSystemRunning(system: ActorSystem, cs: CoordinatedShutdown): Unit - val newSystem = ActorSystem(systemName, systemConfig) + private val newSystem = + ActorSystem(systemName, systemConfig.withFallback(system.settings.config)).asInstanceOf[ExtendedActorSystem] + private var shutdownHooks = Set.empty[Thread] + private val mockRuntime = new JVMShutdownHooks { + override def addHook(t: Thread): Unit = synchronized { + // mimic validation in JDK ApplicationShutdownHooks + if (shutdownHooks == null) + throw new IllegalStateException("Shutdown in progress"); - withSystemRunning(newSystem) + if (t.isAlive()) + throw new IllegalArgumentException("Hook already running"); - TestKit.shutdownActorSystem(newSystem) + if (shutdownHooks.contains(t)) + throw new IllegalArgumentException("Hook previously registered"); - trixyTrixCountJvmHooks(systemName) should ===(0) + shutdownHooks += t + } - protected def myHooksCount: Int = trixyTrixCountJvmHooks(systemName) + override def removeHook(t: Thread): Boolean = synchronized { + // mimic validation in JDK ApplicationShutdownHooks + if (t == null) + throw new NullPointerException(); - private def trixyTrixCountJvmHooks(systemName: String): Int = { - val clazz = Class.forName("java.lang.ApplicationShutdownHooks") - val field = clazz.getDeclaredField("hooks") - field.setAccessible(true) - clazz.synchronized { - val hooks = field.get(null).asInstanceOf[util.IdentityHashMap[Thread, Thread]] - hooks.values().asScala.count(_.getName.startsWith(systemName)) + if (shutdownHooks.contains(t)) { + shutdownHooks -= t + true + } else false } } + private val csConfig = newSystem.settings.config.getConfig("akka.coordinated-shutdown") + // pretend extension creation and start + private val cs = new CoordinatedShutdown(newSystem, CoordinatedShutdown.phasesFromConfig(csConfig), mockRuntime) + CoordinatedShutdown.init(newSystem, csConfig, cs) + + withSystemRunning(newSystem, cs) + + TestKit.shutdownActorSystem(newSystem) + shutdownHooks should have size (0) + + protected def myHooksCount: Int = synchronized(shutdownHooks.size) } } diff --git a/akka-actor/src/main/mima-filters/2.6.18.backward.excludes/non-jdk-intrusive-shutdown-tests.excludes b/akka-actor/src/main/mima-filters/2.6.18.backward.excludes/non-jdk-intrusive-shutdown-tests.excludes new file mode 100644 index 0000000000..52941622a1 --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.6.18.backward.excludes/non-jdk-intrusive-shutdown-tests.excludes @@ -0,0 +1,2 @@ +# internal constructor of CoordinatedShutdown changed for testability +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown.this") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 6bdefd9157..375a4ad0bb 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -188,6 +188,24 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi val conf = system.settings.config.getConfig("akka.coordinated-shutdown") val phases = phasesFromConfig(conf) val coord = new CoordinatedShutdown(system, phases) + init(system, conf, coord) + coord + } + + // locate reason-specific overrides and merge with defaults. + @InternalApi private[akka] def confWithOverrides(conf: Config, reason: Option[Reason]): Config = { + reason + .flatMap { r => + val basePath = s"""reason-overrides."${r.getClass.getName}"""" + if (conf.hasPath(basePath)) Some(conf.getConfig(basePath).withFallback(conf)) else None + } + .getOrElse(conf) + } + + /** INTERNAL API */ + @InternalApi + private[akka] def init(system: ExtendedActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { + // separated for testability initPhaseActorSystemTerminate(system, conf, coord) initJvmHook(system, conf, coord) // Avoid leaking actor system references when system is terminated before JVM is #23384 @@ -206,17 +224,6 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi catch { case _: RejectedExecutionException => cleanupActorSystemJvmHook() } - coord - } - - // locate reason-specific overrides and merge with defaults. - @InternalApi private[akka] def confWithOverrides(conf: Config, reason: Option[Reason]): Config = { - reason - .flatMap { r => - val basePath = s"""reason-overrides."${r.getClass.getName}"""" - if (conf.hasPath(basePath)) Some(conf.getConfig(basePath).withFallback(conf)) else None - } - .getOrElse(conf) } private def initPhaseActorSystemTerminate( @@ -348,9 +355,28 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi } +/** + * INTERNAL API + */ +@InternalApi +private[akka] trait JVMShutdownHooks { + def addHook(t: Thread): Unit + def removeHook(t: Thread): Boolean +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object JVMShutdownHooks extends JVMShutdownHooks { + override def addHook(t: Thread): Unit = Runtime.getRuntime.addShutdownHook(t) + override def removeHook(t: Thread): Boolean = Runtime.getRuntime.removeShutdownHook(t) +} + final class CoordinatedShutdown private[akka] ( system: ExtendedActorSystem, - phases: Map[String, CoordinatedShutdown.Phase]) + phases: Map[String, CoordinatedShutdown.Phase], + jvmShutdownHooks: JVMShutdownHooks = JVMShutdownHooks) extends Extension { import CoordinatedShutdown.{ Reason, UnknownReason } @@ -817,12 +843,12 @@ final class CoordinatedShutdown private[akka] ( } thread.setName(s"${system.name}-shutdown-hook-${newLatch.getCount}") try { - Runtime.getRuntime.addShutdownHook(thread) + jvmShutdownHooks.addHook(thread) new Cancellable { @volatile var cancelled = false def cancel(): Boolean = { try { - if (Runtime.getRuntime.removeShutdownHook(thread)) { + if (jvmShutdownHooks.removeHook(thread)) { cancelled = true _jvmHooksLatch.get.countDown() true diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index af1b0ae20c..9fb41cbc3a 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -4,8 +4,6 @@ package akka.cluster.ddata.protobuf -import java.util.Base64 - import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers @@ -57,17 +55,6 @@ class ReplicatedDataSerializerSpec shutdown() } - /** - * Given a blob created with the previous serializer (with only string keys for maps). If we deserialize it and then - * serialize it again and arrive at the same BLOB we can assume that we are compatible in both directions. - */ - def checkCompatibility(oldBlobAsBase64: String, obj: AnyRef): Unit = { - val oldBlob = Base64.getDecoder.decode(oldBlobAsBase64) - val deserialized = serializer.fromBinary(oldBlob, serializer.manifest(obj)) - val newBlob = serializer.toBinary(deserialized) - newBlob should equal(oldBlob) - } - def checkSerialization(obj: AnyRef): Int = { val blob = serializer.toBinary(obj) val ref = serializer.fromBinary(blob, serializer.manifest(obj)) @@ -262,13 +249,6 @@ class ReplicatedDataSerializerSpec checkSerialization(ORMap().put(address1, Flag(), GSet() + "A").delta.get) } - "be compatible with old ORMap serialization" in { - // Below blob was created with previous version of the serializer - val oldBlobAsBase64 = - "H4sIAAAAAAAAAOOax8jlyaXMJc8lzMWXX5KRWqSXkV9copdflC7wXEWUiYGBQRaIGQQkuJS45LiEuHiL83NTUdQwwtWIC6kQpUqVKAulGBOlGJOE+LkYE4W4uJi5GB0FuJUYnUACSRABJ7AAAOLO3C3DAAAA" - checkCompatibility(oldBlobAsBase64, ORMap()) - } - "serialize LWWMap" in { checkSerialization(LWWMap()) checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any])) @@ -281,13 +261,6 @@ class ReplicatedDataSerializerSpec .put(address2, "b", 17, LWWRegister.defaultClock[Any])) } - "be compatible with old LWWMap serialization" in { - // Below blob was created with previous version of the serializer - val oldBlobAsBase64 = - "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC0kJEqZJiTBSy4AISxhwzrl2fuyRMiIAWKS4utrLEnNJUQwERAD96/peLAAAA" - checkCompatibility(oldBlobAsBase64, LWWMap()) - } - "serialize PNCounterMap" in { checkSerialization(PNCounterMap()) checkSerialization(PNCounterMap().increment(address1, "a", 3)) @@ -298,13 +271,6 @@ class ReplicatedDataSerializerSpec PNCounterMap().increment(address1, "a", 3).decrement(address2, "a", 2).increment(address2, "b", 5)) } - "be compatible with old PNCounterMap serialization" in { - // Below blob was created with previous version of the serializer - val oldBlobAsBase64 = - "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC8kJEqZJiTBTS4wISmlyqXMqE1AsxMgsxAADYQs/9gQAAAA==" - checkCompatibility(oldBlobAsBase64, PNCounterMap()) - } - "serialize ORMultiMap" in { checkSerialization(ORMultiMap()) checkSerialization(ORMultiMap().addBinding(address1, "a", "A")) @@ -328,13 +294,6 @@ class ReplicatedDataSerializerSpec checkSerialization(d3) } - "be compatible with old ORMultiMap serialization" in { - // Below blob was created with previous version of the serializer - val oldBlobAsBase64 = - "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwCakJEqZJiTBQK4QISxJmqSpSpqlKMjgDlsHjDpwAAAA==" - checkCompatibility(oldBlobAsBase64, ORMultiMap()) - } - "serialize ORMultiMap withValueDeltas" in { checkSerialization(ORMultiMap._emptyWithValueDeltas) checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, "a", "A")) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 3b65d4b1b3..02c65eba0d 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -210,7 +210,8 @@ object AkkaBuild { defaults ++ CliOptions.runningOnCi .ifTrue(jvmGCLogOptions(JdkOptions.isJdk11orHigher, JdkOptions.isJdk8)) - .getOrElse(Nil) + .getOrElse(Nil) ++ + JdkOptions.versionSpecificJavaOptions }, // all system properties passed to sbt prefixed with "akka." or "aeron." will be passed on to the forked jvms as is Test / javaOptions := { diff --git a/project/JdkOptions.scala b/project/JdkOptions.scala index e5be0d92f9..faa68e2c80 100644 --- a/project/JdkOptions.scala +++ b/project/JdkOptions.scala @@ -26,6 +26,16 @@ object JdkOptions extends AutoPlugin { VersionNumber(specificationVersion).matchesSemVer(SemanticSelector(s"=1.8")) val isJdk11orHigher: Boolean = VersionNumber(specificationVersion).matchesSemVer(SemanticSelector(">=11")) + val isJdk17orHigher: Boolean = + VersionNumber(specificationVersion).matchesSemVer(SemanticSelector(">=17")) + + val versionSpecificJavaOptions = + if (isJdk17orHigher) { + // for aeron + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" :: + // for LevelDB + "--add-opens=java.base/java.nio=ALL-UNNAMED" :: Nil + } else Nil def notOnJdk8[T](values: Seq[T]): Seq[T] = if (isJdk8) Seq.empty[T] else values diff --git a/project/MultiNode.scala b/project/MultiNode.scala index fe19212c9f..d74083c770 100644 --- a/project/MultiNode.scala +++ b/project/MultiNode.scala @@ -60,7 +60,7 @@ object MultiNode extends AutoPlugin { } "-Xmx256m" :: akkaProperties ::: CliOptions.sbtLogNoFormat.ifTrue("-Dakka.test.nocolor=true").toList - } + } ++ JdkOptions.versionSpecificJavaOptions private val anyConfigsInThisProject = ScopeFilter(configurations = inAnyConfiguration)