JDK 17 CI nightly job (#31085)
This commit is contained in:
parent
cea42b2a4d
commit
9b97614b95
8 changed files with 116 additions and 93 deletions
5
.github/workflows/nightly-builds.yml
vendored
5
.github/workflows/nightly-builds.yml
vendored
|
|
@ -3,6 +3,7 @@ name: Nightly Builds
|
|||
on:
|
||||
schedule:
|
||||
- cron: "0 0 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
|
||||
|
|
@ -120,12 +121,14 @@ jobs:
|
|||
# binary version is required and Akka build will set the right
|
||||
# full version from it.
|
||||
scalaVersion: ["2.12", "2.13"]
|
||||
jdkVersion: ["adopt@1.8.0", "adopt@1.11"]
|
||||
jdkVersion: ["adopt@1.8.0", "adopt@1.11", "openjdk@1.17.0"]
|
||||
include:
|
||||
- jdkVersion: adopt@1.8.0
|
||||
extraOpts: "-Dmultinode.XX:+PrintGCDetails -Dmultinode.XX:+PrintGCTimeStamps"
|
||||
- jdkVersion: adopt@1.11
|
||||
extraOpts: "-Dmultinode.Xlog:gc"
|
||||
- jdkVersion: openjdk@1.17.0
|
||||
extraopts: "-Dmultinode.Xlog:gc"
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
|
|
|||
|
|
@ -4,21 +4,25 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import java.util
|
||||
import java.util.concurrent.{ Executors, TimeoutException }
|
||||
|
||||
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
|
||||
import akka.ConfigurationException
|
||||
import akka.Done
|
||||
import akka.actor.CoordinatedShutdown.Phase
|
||||
import akka.actor.CoordinatedShutdown.UnknownReason
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.testkit.{ AkkaSpec, EventFilter, TestKit, TestProbe }
|
||||
import akka.util.ccompat.JavaConverters._
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.EventFilter
|
||||
import akka.testkit.TestKit
|
||||
import akka.testkit.TestProbe
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeoutException
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
|
||||
class CoordinatedShutdownSpec
|
||||
extends AkkaSpec(ConfigFactory.parseString("""
|
||||
|
|
@ -573,9 +577,9 @@ class CoordinatedShutdownSpec
|
|||
akka.coordinated-shutdown.run-by-actor-system-terminate = off
|
||||
""")
|
||||
|
||||
override def withSystemRunning(newSystem: ActorSystem): Unit = {
|
||||
override def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = {
|
||||
val cancellable =
|
||||
CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
myHooksCount should ===(1) // one user, none from system
|
||||
cancellable.cancel()
|
||||
}
|
||||
|
|
@ -589,9 +593,9 @@ class CoordinatedShutdownSpec
|
|||
akka.coordinated-shutdown.run-by-actor-system-terminate = off
|
||||
""")
|
||||
|
||||
override def withSystemRunning(newSystem: ActorSystem): Unit = {
|
||||
override def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = {
|
||||
val cancellable =
|
||||
CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
myHooksCount should ===(2) // one user, one from system
|
||||
|
||||
cancellable.cancel()
|
||||
|
|
@ -605,9 +609,9 @@ class CoordinatedShutdownSpec
|
|||
akka.coordinated-shutdown.terminate-actor-system = on
|
||||
""")
|
||||
|
||||
def withSystemRunning(newSystem: ActorSystem): Unit = {
|
||||
def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = {
|
||||
val cancellable =
|
||||
CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
myHooksCount should ===(2) // one user, one from actor system
|
||||
cancellable.cancel()
|
||||
}
|
||||
|
|
@ -620,9 +624,9 @@ class CoordinatedShutdownSpec
|
|||
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on
|
||||
""")
|
||||
|
||||
def withSystemRunning(newSystem: ActorSystem): Unit = {
|
||||
def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = {
|
||||
val cancellable =
|
||||
CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
coordinatedShutdown.addCancellableJvmShutdownHook(println(s"User JVM hook from ${newSystem.name}"))
|
||||
myHooksCount should ===(1) // one user, none from actor system
|
||||
cancellable.cancel()
|
||||
}
|
||||
|
|
@ -635,7 +639,7 @@ class CoordinatedShutdownSpec
|
|||
akka.coordinated-shutdown.terminate-actor-system = on
|
||||
""")
|
||||
|
||||
def withSystemRunning(newSystem: ActorSystem): Unit = {
|
||||
def withSystemRunning(newSystem: ActorSystem, coordinatedShutdown: CoordinatedShutdown): Unit = {
|
||||
TestKit.shutdownActorSystem(newSystem)
|
||||
CoordinatedShutdown(newSystem)
|
||||
|
||||
|
|
@ -740,32 +744,50 @@ class CoordinatedShutdownSpec
|
|||
|
||||
abstract class JvmHookTest {
|
||||
|
||||
private val initialHookCount = trixyTrixCountJvmHooks(systemName)
|
||||
initialHookCount should ===(0)
|
||||
|
||||
def systemName: String
|
||||
def systemConfig: Config
|
||||
def withSystemRunning(system: ActorSystem): Unit
|
||||
def withSystemRunning(system: ActorSystem, cs: CoordinatedShutdown): Unit
|
||||
|
||||
val newSystem = ActorSystem(systemName, systemConfig)
|
||||
private val newSystem =
|
||||
ActorSystem(systemName, systemConfig.withFallback(system.settings.config)).asInstanceOf[ExtendedActorSystem]
|
||||
private var shutdownHooks = Set.empty[Thread]
|
||||
private val mockRuntime = new JVMShutdownHooks {
|
||||
override def addHook(t: Thread): Unit = synchronized {
|
||||
// mimic validation in JDK ApplicationShutdownHooks
|
||||
if (shutdownHooks == null)
|
||||
throw new IllegalStateException("Shutdown in progress");
|
||||
|
||||
withSystemRunning(newSystem)
|
||||
if (t.isAlive())
|
||||
throw new IllegalArgumentException("Hook already running");
|
||||
|
||||
TestKit.shutdownActorSystem(newSystem)
|
||||
if (shutdownHooks.contains(t))
|
||||
throw new IllegalArgumentException("Hook previously registered");
|
||||
|
||||
trixyTrixCountJvmHooks(systemName) should ===(0)
|
||||
shutdownHooks += t
|
||||
}
|
||||
|
||||
protected def myHooksCount: Int = trixyTrixCountJvmHooks(systemName)
|
||||
override def removeHook(t: Thread): Boolean = synchronized {
|
||||
// mimic validation in JDK ApplicationShutdownHooks
|
||||
if (t == null)
|
||||
throw new NullPointerException();
|
||||
|
||||
private def trixyTrixCountJvmHooks(systemName: String): Int = {
|
||||
val clazz = Class.forName("java.lang.ApplicationShutdownHooks")
|
||||
val field = clazz.getDeclaredField("hooks")
|
||||
field.setAccessible(true)
|
||||
clazz.synchronized {
|
||||
val hooks = field.get(null).asInstanceOf[util.IdentityHashMap[Thread, Thread]]
|
||||
hooks.values().asScala.count(_.getName.startsWith(systemName))
|
||||
if (shutdownHooks.contains(t)) {
|
||||
shutdownHooks -= t
|
||||
true
|
||||
} else false
|
||||
}
|
||||
}
|
||||
private val csConfig = newSystem.settings.config.getConfig("akka.coordinated-shutdown")
|
||||
// pretend extension creation and start
|
||||
private val cs = new CoordinatedShutdown(newSystem, CoordinatedShutdown.phasesFromConfig(csConfig), mockRuntime)
|
||||
CoordinatedShutdown.init(newSystem, csConfig, cs)
|
||||
|
||||
withSystemRunning(newSystem, cs)
|
||||
|
||||
TestKit.shutdownActorSystem(newSystem)
|
||||
shutdownHooks should have size (0)
|
||||
|
||||
protected def myHooksCount: Int = synchronized(shutdownHooks.size)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
# internal constructor of CoordinatedShutdown changed for testability
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown.this")
|
||||
|
|
@ -188,6 +188,24 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
|||
val conf = system.settings.config.getConfig("akka.coordinated-shutdown")
|
||||
val phases = phasesFromConfig(conf)
|
||||
val coord = new CoordinatedShutdown(system, phases)
|
||||
init(system, conf, coord)
|
||||
coord
|
||||
}
|
||||
|
||||
// locate reason-specific overrides and merge with defaults.
|
||||
@InternalApi private[akka] def confWithOverrides(conf: Config, reason: Option[Reason]): Config = {
|
||||
reason
|
||||
.flatMap { r =>
|
||||
val basePath = s"""reason-overrides."${r.getClass.getName}""""
|
||||
if (conf.hasPath(basePath)) Some(conf.getConfig(basePath).withFallback(conf)) else None
|
||||
}
|
||||
.getOrElse(conf)
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi
|
||||
private[akka] def init(system: ExtendedActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = {
|
||||
// separated for testability
|
||||
initPhaseActorSystemTerminate(system, conf, coord)
|
||||
initJvmHook(system, conf, coord)
|
||||
// Avoid leaking actor system references when system is terminated before JVM is #23384
|
||||
|
|
@ -206,17 +224,6 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
|||
catch {
|
||||
case _: RejectedExecutionException => cleanupActorSystemJvmHook()
|
||||
}
|
||||
coord
|
||||
}
|
||||
|
||||
// locate reason-specific overrides and merge with defaults.
|
||||
@InternalApi private[akka] def confWithOverrides(conf: Config, reason: Option[Reason]): Config = {
|
||||
reason
|
||||
.flatMap { r =>
|
||||
val basePath = s"""reason-overrides."${r.getClass.getName}""""
|
||||
if (conf.hasPath(basePath)) Some(conf.getConfig(basePath).withFallback(conf)) else None
|
||||
}
|
||||
.getOrElse(conf)
|
||||
}
|
||||
|
||||
private def initPhaseActorSystemTerminate(
|
||||
|
|
@ -348,9 +355,28 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] trait JVMShutdownHooks {
|
||||
def addHook(t: Thread): Unit
|
||||
def removeHook(t: Thread): Boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object JVMShutdownHooks extends JVMShutdownHooks {
|
||||
override def addHook(t: Thread): Unit = Runtime.getRuntime.addShutdownHook(t)
|
||||
override def removeHook(t: Thread): Boolean = Runtime.getRuntime.removeShutdownHook(t)
|
||||
}
|
||||
|
||||
final class CoordinatedShutdown private[akka] (
|
||||
system: ExtendedActorSystem,
|
||||
phases: Map[String, CoordinatedShutdown.Phase])
|
||||
phases: Map[String, CoordinatedShutdown.Phase],
|
||||
jvmShutdownHooks: JVMShutdownHooks = JVMShutdownHooks)
|
||||
extends Extension {
|
||||
import CoordinatedShutdown.{ Reason, UnknownReason }
|
||||
|
||||
|
|
@ -817,12 +843,12 @@ final class CoordinatedShutdown private[akka] (
|
|||
}
|
||||
thread.setName(s"${system.name}-shutdown-hook-${newLatch.getCount}")
|
||||
try {
|
||||
Runtime.getRuntime.addShutdownHook(thread)
|
||||
jvmShutdownHooks.addHook(thread)
|
||||
new Cancellable {
|
||||
@volatile var cancelled = false
|
||||
def cancel(): Boolean = {
|
||||
try {
|
||||
if (Runtime.getRuntime.removeShutdownHook(thread)) {
|
||||
if (jvmShutdownHooks.removeHook(thread)) {
|
||||
cancelled = true
|
||||
_jvmHooksLatch.get.countDown()
|
||||
true
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@
|
|||
|
||||
package akka.cluster.ddata.protobuf
|
||||
|
||||
import java.util.Base64
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
|
@ -57,17 +55,6 @@ class ReplicatedDataSerializerSpec
|
|||
shutdown()
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a blob created with the previous serializer (with only string keys for maps). If we deserialize it and then
|
||||
* serialize it again and arrive at the same BLOB we can assume that we are compatible in both directions.
|
||||
*/
|
||||
def checkCompatibility(oldBlobAsBase64: String, obj: AnyRef): Unit = {
|
||||
val oldBlob = Base64.getDecoder.decode(oldBlobAsBase64)
|
||||
val deserialized = serializer.fromBinary(oldBlob, serializer.manifest(obj))
|
||||
val newBlob = serializer.toBinary(deserialized)
|
||||
newBlob should equal(oldBlob)
|
||||
}
|
||||
|
||||
def checkSerialization(obj: AnyRef): Int = {
|
||||
val blob = serializer.toBinary(obj)
|
||||
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
|
||||
|
|
@ -262,13 +249,6 @@ class ReplicatedDataSerializerSpec
|
|||
checkSerialization(ORMap().put(address1, Flag(), GSet() + "A").delta.get)
|
||||
}
|
||||
|
||||
"be compatible with old ORMap serialization" in {
|
||||
// Below blob was created with previous version of the serializer
|
||||
val oldBlobAsBase64 =
|
||||
"H4sIAAAAAAAAAOOax8jlyaXMJc8lzMWXX5KRWqSXkV9copdflC7wXEWUiYGBQRaIGQQkuJS45LiEuHiL83NTUdQwwtWIC6kQpUqVKAulGBOlGJOE+LkYE4W4uJi5GB0FuJUYnUACSRABJ7AAAOLO3C3DAAAA"
|
||||
checkCompatibility(oldBlobAsBase64, ORMap())
|
||||
}
|
||||
|
||||
"serialize LWWMap" in {
|
||||
checkSerialization(LWWMap())
|
||||
checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any]))
|
||||
|
|
@ -281,13 +261,6 @@ class ReplicatedDataSerializerSpec
|
|||
.put(address2, "b", 17, LWWRegister.defaultClock[Any]))
|
||||
}
|
||||
|
||||
"be compatible with old LWWMap serialization" in {
|
||||
// Below blob was created with previous version of the serializer
|
||||
val oldBlobAsBase64 =
|
||||
"H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC0kJEqZJiTBSy4AISxhwzrl2fuyRMiIAWKS4utrLEnNJUQwERAD96/peLAAAA"
|
||||
checkCompatibility(oldBlobAsBase64, LWWMap())
|
||||
}
|
||||
|
||||
"serialize PNCounterMap" in {
|
||||
checkSerialization(PNCounterMap())
|
||||
checkSerialization(PNCounterMap().increment(address1, "a", 3))
|
||||
|
|
@ -298,13 +271,6 @@ class ReplicatedDataSerializerSpec
|
|||
PNCounterMap().increment(address1, "a", 3).decrement(address2, "a", 2).increment(address2, "b", 5))
|
||||
}
|
||||
|
||||
"be compatible with old PNCounterMap serialization" in {
|
||||
// Below blob was created with previous version of the serializer
|
||||
val oldBlobAsBase64 =
|
||||
"H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC8kJEqZJiTBTS4wISmlyqXMqE1AsxMgsxAADYQs/9gQAAAA=="
|
||||
checkCompatibility(oldBlobAsBase64, PNCounterMap())
|
||||
}
|
||||
|
||||
"serialize ORMultiMap" in {
|
||||
checkSerialization(ORMultiMap())
|
||||
checkSerialization(ORMultiMap().addBinding(address1, "a", "A"))
|
||||
|
|
@ -328,13 +294,6 @@ class ReplicatedDataSerializerSpec
|
|||
checkSerialization(d3)
|
||||
}
|
||||
|
||||
"be compatible with old ORMultiMap serialization" in {
|
||||
// Below blob was created with previous version of the serializer
|
||||
val oldBlobAsBase64 =
|
||||
"H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwCakJEqZJiTBQK4QISxJmqSpSpqlKMjgDlsHjDpwAAAA=="
|
||||
checkCompatibility(oldBlobAsBase64, ORMultiMap())
|
||||
}
|
||||
|
||||
"serialize ORMultiMap withValueDeltas" in {
|
||||
checkSerialization(ORMultiMap._emptyWithValueDeltas)
|
||||
checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, "a", "A"))
|
||||
|
|
|
|||
|
|
@ -210,7 +210,8 @@ object AkkaBuild {
|
|||
|
||||
defaults ++ CliOptions.runningOnCi
|
||||
.ifTrue(jvmGCLogOptions(JdkOptions.isJdk11orHigher, JdkOptions.isJdk8))
|
||||
.getOrElse(Nil)
|
||||
.getOrElse(Nil) ++
|
||||
JdkOptions.versionSpecificJavaOptions
|
||||
},
|
||||
// all system properties passed to sbt prefixed with "akka." or "aeron." will be passed on to the forked jvms as is
|
||||
Test / javaOptions := {
|
||||
|
|
|
|||
|
|
@ -26,6 +26,16 @@ object JdkOptions extends AutoPlugin {
|
|||
VersionNumber(specificationVersion).matchesSemVer(SemanticSelector(s"=1.8"))
|
||||
val isJdk11orHigher: Boolean =
|
||||
VersionNumber(specificationVersion).matchesSemVer(SemanticSelector(">=11"))
|
||||
val isJdk17orHigher: Boolean =
|
||||
VersionNumber(specificationVersion).matchesSemVer(SemanticSelector(">=17"))
|
||||
|
||||
val versionSpecificJavaOptions =
|
||||
if (isJdk17orHigher) {
|
||||
// for aeron
|
||||
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" ::
|
||||
// for LevelDB
|
||||
"--add-opens=java.base/java.nio=ALL-UNNAMED" :: Nil
|
||||
} else Nil
|
||||
|
||||
def notOnJdk8[T](values: Seq[T]): Seq[T] = if (isJdk8) Seq.empty[T] else values
|
||||
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ object MultiNode extends AutoPlugin {
|
|||
}
|
||||
|
||||
"-Xmx256m" :: akkaProperties ::: CliOptions.sbtLogNoFormat.ifTrue("-Dakka.test.nocolor=true").toList
|
||||
}
|
||||
} ++ JdkOptions.versionSpecificJavaOptions
|
||||
|
||||
private val anyConfigsInThisProject = ScopeFilter(configurations = inAnyConfiguration)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue