From d75ad252b85a47ed896f2a1782d161bbe36b7fe1 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Fri, 27 Aug 2021 16:40:51 +0100 Subject: [PATCH] Run multi-node tests on fresh GKE clusters (#30570) * Port multi-node-test from Jenkins * link to issues for gh-excluded tests * use kubectl in multi-jvm plugin --- .github/workflows/multi-node.yml | 80 +++ .gitignore | 6 + .jvmopts-ci | 13 +- .../metrics/ClusterMetricsRoutingSpec.scala | 16 +- ...sterShardingRememberEntitiesPerfSpec.scala | 18 +- ...sterShardingPreparingForShutdownSpec.scala | 4 +- ...ndomizedBrainResolverIntegrationSpec.scala | 3 +- .../SplitBrainResolverIntegrationSpec.scala | 7 +- .../ClusterShardingRememberEntitiesSpec.scala | 3 +- .../scala/akka/cluster/QuickRestartSpec.scala | 22 +- .../RemoteFeaturesWithClusterSpec.scala | 6 +- .../cluster/RestartFirstSeedNodeSpec.scala | 5 +- .../scala/akka/cluster/RestartNode2Spec.scala | 4 +- .../scala/akka/cluster/RestartNode3Spec.scala | 4 +- .../scala/akka/cluster/RestartNodeSpec.scala | 4 +- .../scala/akka/cluster/StressSpec.scala | 4 +- .../akka/cluster/ddata/DurableDataSpec.scala | 2 +- .../cluster/ddata/DurablePruningSpec.scala | 2 +- .../src/main/paradox/multi-jvm-testing.md | 27 +- .../akka/remote/testkit/MultiNodeSpec.scala | 32 ++ .../aeron/AeronStreamConcistencySpec.scala | 8 +- .../artery/aeron/AeronStreamLatencySpec.scala | 9 +- .../aeron/AeronStreamMaxThroughputSpec.scala | 8 +- build.sbt | 2 +- kubernetes/.gitignore | 1 + kubernetes/create-cluster-gke.sh | 80 +++ kubernetes/setup.sh | 21 + kubernetes/test-node-base.yaml | 92 +++ project/Jvm.scala | 118 ++++ project/MultiNode.scala | 4 +- project/SbtMultiJvm.scala | 543 ++++++++++++++++++ project/plugins.sbt | 5 - 32 files changed, 1087 insertions(+), 66 deletions(-) create mode 100644 .github/workflows/multi-node.yml create mode 100644 kubernetes/.gitignore create mode 100755 kubernetes/create-cluster-gke.sh create mode 100755 kubernetes/setup.sh create mode 100644 kubernetes/test-node-base.yaml create mode 100644 project/Jvm.scala create mode 100644 project/SbtMultiJvm.scala diff --git a/.github/workflows/multi-node.yml b/.github/workflows/multi-node.yml new file mode 100644 index 0000000000..ac104ac75a --- /dev/null +++ b/.github/workflows/multi-node.yml @@ -0,0 +1,80 @@ +name: Multi node test + +on: + schedule: + - cron: '0 0 * * *' + +concurrency: + # Only run once for latest commit per ref and cancel other (previous) runs. + group: ci-multi-node-${{ github.ref }} + cancel-in-progress: true + +jobs: + run-multi-node-tests: + name: Multi Node Test + runs-on: ubuntu-20.04 + if: github.repository == 'akka/akka' + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Install Kubectl + run: | + sudo snap install kubectl --classic + - uses: google-github-actions/setup-gcloud@v0.2 + with: + service_account_key: ${{ secrets.GKE_SA_KEY }} + project_id: ${{ secrets.GKE_PROJECT }} + - name: Create the cluster + run: |- + gcloud config set compute/region us-central1 + gcloud config set compute/zone us-central1-c + ./kubernetes/create-cluster-gke.sh "akka-multi-node-${GITHUB_RUN_ID}" + - name: Setup Pods + run: | + # Stress tests are using 13 nodes + ./kubernetes/setup.sh 15 multi-node-test.hosts + - name: Set up JDK 11 + uses: olafurpg/setup-scala@v10 + with: + java-version: adopt@1.11.0-9 + - name: Cache Coursier cache + uses: coursier/cache-action@v6.2 + - name: Multi node test + run: | + cat multi-node-test.hosts + sbt -jvm-opts .jvmopts-ci \ + -Dakka.test.timefactor=2 \ + -Dakka.cluster.assert=on \ + -Dsbt.override.build.repos=false \ + -Dakka.test.tags.exclude=gh-exclude \ + -Dakka.test.multi-node=true \ + -Dakka.test.multi-node.targetDirName=${PWD}/target/${{ github.run_id }} \ + -Dakka.test.multi-node.java=${JAVA_HOME}/bin/java \ + -Dmultinode.XX:MetaspaceSize=128M \ + -Dmultinode.Xms512M \ + -Dmultinode.Xmx1536G \ + -Dmultinode.Xlog:gc \ + multiNodeTest + - name: Email on failure + if: ${{ failure() }} + uses: dawidd6/action-send-mail@v3 + with: + server_address: smtp.gmail.com + server_port: 465 + # Using port 465 already sets `secure: true` + secure: true + username: ${{secrets.MAIL_USERNAME}} + password: ${{secrets.MAIL_PASSWORD}} + subject: Multi node test (Akka) + to: akka.official@gmail.com + from: Akka CI (GHActions) + body: | + Multi node test of ${{github.repository}} failed! + https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} + - name: Cleanup the environment + if: ${{ always() }} + shell: bash {0} + run: | + gcloud container clusters delete "akka-multi-node-${GITHUB_RUN_ID}" --quiet diff --git a/.gitignore b/.gitignore index b89198cfd3..f914f3e6f6 100644 --- a/.gitignore +++ b/.gitignore @@ -90,8 +90,14 @@ test-output factorials.txt factorial2.txt +all-projects.txt +multi-node-projects.txt +/multi-node-test.hosts + # Default sigar library extract location. native/ /dumps/ /core + +.tmp diff --git a/.jvmopts-ci b/.jvmopts-ci index 0db4313407..7fefb60249 100644 --- a/.jvmopts-ci +++ b/.jvmopts-ci @@ -1,6 +1,13 @@ -# This is used to configure the sbt instance that github actions launches +# This is used to configure the sbt instance in CI --Xms2G --Xmx2G +-XX:+UseG1GC +-Xms3G +-Xmx3G -Xss2M -XX:ReservedCodeCacheSize=256m +-XX:MaxGCPauseMillis=750 +-XX:-UseBiasedLocking +-XX:+UseCompressedOops +-XX:MetaspaceSize=512M +-XX:-ClassUnloadingWithConcurrentMark +-Djava.security.egd=file:/dev/./urandom diff --git a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala index 10b571dbcc..b2c7161fc9 100644 --- a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala +++ b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala @@ -6,6 +6,7 @@ package akka.cluster.metrics import java.lang.management.ManagementFactory +import scala.annotation.nowarn import scala.concurrent.Await import scala.concurrent.duration._ @@ -25,7 +26,7 @@ import akka.routing.FromConfig import akka.routing.GetRoutees import akka.routing.Routees import akka.serialization.jackson.CborSerializable -import akka.testkit.{ DefaultTimeout, ImplicitSender, LongRunningTest } +import akka.testkit.{ DefaultTimeout, GHExcludeTest, ImplicitSender, LongRunningTest } import akka.util.unused object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig { @@ -118,6 +119,7 @@ class AdaptiveLoadBalancingRouterMultiJvmNode1 extends AdaptiveLoadBalancingRout class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec +@nowarn abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoadBalancingRouterConfig) with MultiNodeClusterSpec @@ -170,7 +172,8 @@ abstract class AdaptiveLoadBalancingRouterSpec enterBarrier("after-1") } - "use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "use all nodes in the cluster when not overloaded" taggedAs (LongRunningTest, GHExcludeTest) in { runOn(node1) { val router1 = startRouter("router1") @@ -196,7 +199,8 @@ abstract class AdaptiveLoadBalancingRouterSpec enterBarrier("after-2") } - "prefer node with more free heap capacity" taggedAs LongRunningTest in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "prefer node with more free heap capacity" taggedAs (LongRunningTest, GHExcludeTest) in { System.gc() enterBarrier("gc") @@ -229,7 +233,8 @@ abstract class AdaptiveLoadBalancingRouterSpec enterBarrier("after-3") } - "create routees from configuration" taggedAs LongRunningTest in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "create routees from configuration" taggedAs (LongRunningTest, GHExcludeTest) in { runOn(node1) { val router3 = system.actorOf(FromConfig.props(Props[Memory]()), "router3") // it may take some time until router receives cluster member events @@ -240,7 +245,8 @@ abstract class AdaptiveLoadBalancingRouterSpec enterBarrier("after-4") } - "create routees from cluster.enabled configuration" taggedAs LongRunningTest in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "create routees from cluster.enabled configuration" taggedAs (LongRunningTest, GHExcludeTest) in { runOn(node1) { val router4 = system.actorOf(FromConfig.props(Props[Memory]()), "router4") // it may take some time until router receives cluster member events diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala index 1d1c18f5d7..f9bf5cc50c 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala @@ -177,7 +177,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec enterBarrier(s"after-start-stop-${testRun}") } - "test when starting new entity" in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "test when starting new entity" taggedAs GHExcludeTest in { val numberOfMessages = 200 * NrOfMessagesFactor runBench("start new entities") { (iteration, region, histogram) => (1 to numberOfMessages).foreach { n => @@ -190,7 +191,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec } } - "test latency when starting new entity and sending a few messages" in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "test latency when starting new entity and sending a few messages" taggedAs GHExcludeTest in { val numberOfMessages = 800 * NrOfMessagesFactor runBench("start, few messages") { (iteration, region, histogram) => for (n <- 1 to numberOfMessages / 5; _ <- 1 to 5) { @@ -203,7 +205,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec } } - "test latency when starting new entity and sending a few messages to it and stopping" in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "test latency when starting new entity and sending a few messages to it and stopping" taggedAs GHExcludeTest in { val numberOfMessages = 800 * NrOfMessagesFactor // 160 entities, and an extra one for the intialization // all but the first one are not removed @@ -237,7 +240,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec } } - "test latency when starting, few messages, stopping, few messages" in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "test latency when starting, few messages, stopping, few messages" taggedAs GHExcludeTest in { val numberOfMessages = 800 * NrOfMessagesFactor runBench("start, few messages, stop, few messages") { (iteration, region, histogram) => for (n <- 1 to numberOfMessages / 5; m <- 1 to 5) { @@ -260,7 +264,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec } } - "test when starting some new entities mixed with sending to started" in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "test when starting some new entities mixed with sending to started" taggedAs GHExcludeTest in { runBench("starting mixed with sending to started") { (iteration, region, histogram) => val numberOfMessages = 1600 * NrOfMessagesFactor (1 to numberOfMessages).foreach { n => @@ -284,7 +289,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec } } - "test sending to started" in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "test sending to started" taggedAs GHExcludeTest in { runBench("sending to started") { (iteration, region, histogram) => val numberOfMessages = 1600 * NrOfMessagesFactor (1 to numberOfMessages).foreach { n => diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala index 7e401079ff..e0904c68e8 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala @@ -21,6 +21,7 @@ import akka.cluster.typed.PrepareForFullClusterShutdown import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.serialization.jackson.CborSerializable +import akka.testkit.GHExcludeTest import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ @@ -78,7 +79,8 @@ class ClusterShardingPreparingForShutdownSpec formCluster(first, second, third) } - "not rebalance but should still work preparing for shutdown" in { + // Excluded on GH Actions: https://github.com/akka/akka/issues/30486 + "not rebalance but should still work preparing for shutdown" taggedAs GHExcludeTest in { val shardRegion: ActorRef[ShardingEnvelope[Command]] = sharding.init(Entity(typeKey)(_ => Pinger())) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala index 13a74c4de7..6b830064af 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala @@ -130,8 +130,7 @@ class RandomizedSplitBrainResolverIntegrationSpec c += 1 val sys: ActorSystem = { - - val sys = ActorSystem(system.name + "-" + c, system.settings.config) + val sys = ActorSystem(system.name + "-" + c, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) val gremlinController = sys.actorOf(GremlinController.props, "gremlinController") system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c") sys diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/SplitBrainResolverIntegrationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/SplitBrainResolverIntegrationSpec.scala index 05b6ca55c5..cbc0671f6e 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/SplitBrainResolverIntegrationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/SplitBrainResolverIntegrationSpec.scala @@ -120,9 +120,10 @@ class SplitBrainResolverIntegrationSpec val sys = ActorSystem( system.name + "-" + c, - scenario.cfg - .withValue("akka.cluster.multi-data-center.self-data-center", ConfigValueFactory.fromAnyRef(dcName)) - .withFallback(system.settings.config)) + MultiNodeSpec.configureNextPortIfFixed( + scenario.cfg + .withValue("akka.cluster.multi-data-center.self-data-center", ConfigValueFactory.fromAnyRef(dcName)) + .withFallback(system.settings.config))) val gremlinController = sys.actorOf(GremlinController.props, "gremlinController") system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c") sys diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 68b5486e0f..6784ddba44 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -11,6 +11,7 @@ import com.typesafe.config.ConfigFactory import akka.actor._ import akka.cluster.{ Cluster, MemberStatus } import akka.testkit._ +import akka.remote.testkit.MultiNodeSpec import akka.util.ccompat._ @ccompatUsedUntil213 @@ -200,7 +201,7 @@ abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShard } // no nodes left of the original cluster, start a new cluster - val sys2 = ActorSystem(system.name, system.settings.config) + val sys2 = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) val entityProbe2 = TestProbe()(sys2) val probe2 = TestProbe()(sys2) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala index eac9ea5d3a..286406bdb9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala @@ -46,6 +46,7 @@ abstract class QuickRestartSpec val rounds = 3 + override def verifySystemShutdown: Boolean = true override def expectedTestDuration: FiniteDuration = 45.seconds * rounds "Quickly restarting node" must { @@ -61,20 +62,21 @@ abstract class QuickRestartSpec for (n <- 1 to rounds) { log.info("round-" + n) runOn(second) { - restartingSystem = - if (restartingSystem == null) - ActorSystem( - system.name, - ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]").withFallback(system.settings.config)) - else - ActorSystem( - system.name, - // use the same port - ConfigFactory.parseString(s""" + restartingSystem = if (restartingSystem == null) { + ActorSystem( + system.name, + MultiNodeSpec.configureNextPortIfFixed( + ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]").withFallback(system.settings.config))) + } else { + ActorSystem( + system.name, + // use the same port + ConfigFactory.parseString(s""" akka.cluster.roles = [round-$n] akka.remote.classic.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get} akka.remote.artery.canonical.port = ${Cluster(restartingSystem).selfAddress.port.get} """).withFallback(system.settings.config)) + } log.info("Restarting node has address: {}", Cluster(restartingSystem).selfUniqueAddress) Cluster(restartingSystem).joinSeedNodes(seedNodes) within(20.seconds) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RemoteFeaturesWithClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RemoteFeaturesWithClusterSpec.scala index 9e5af3717c..5b38c1fd8a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RemoteFeaturesWithClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RemoteFeaturesWithClusterSpec.scala @@ -26,12 +26,14 @@ class ClusterRemoteFeaturesConfig(artery: Boolean) extends MultiNodeConfig { val second = role("second") val third = role("third") - private val baseConfig = ConfigFactory.parseString(s""" + private val baseConfig = { + ConfigFactory.parseString(s""" akka.remote.log-remote-lifecycle-events = off akka.remote.artery.enabled = $artery - akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.port = ${MultiNodeSpec.selfPort} akka.log-dead-letters-during-shutdown = off """).withFallback(MultiNodeClusterSpec.clusterConfig) + } commonConfig(debugConfig(on = false).withFallback(baseConfig)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index 59eedba434..8546942e5c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -52,7 +52,9 @@ abstract class RestartFirstSeedNodeSpec @volatile var seedNode1Address: Address = _ // use a separate ActorSystem, to be able to simulate restart - lazy val seed1System = ActorSystem(system.name, system.settings.config) + lazy val seed1System = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) + + override def verifySystemShutdown: Boolean = true def missingSeed = address(seed3).copy(port = Some(61313)) def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2, seed3, missingSeed) @@ -67,6 +69,7 @@ abstract class RestartFirstSeedNodeSpec override def afterAll(): Unit = { runOn(seed1) { shutdown(if (seed1System.whenTerminated.isCompleted) restartedSeed1System else seed1System) + } super.afterAll() } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala index a2c31c94d1..1c1d0f362a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala @@ -56,7 +56,9 @@ abstract class RestartNode2SpecSpec @volatile var seedNode1Address: Address = _ // use a separate ActorSystem, to be able to simulate restart - lazy val seed1System = ActorSystem(system.name, system.settings.config) + lazy val seed1System = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) + + override def verifySystemShutdown: Boolean = true def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala index 4bab117a54..8cb59eed58 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala @@ -56,7 +56,9 @@ abstract class RestartNode3Spec @volatile var secondUniqueAddress: UniqueAddress = _ // use a separate ActorSystem, to be able to simulate restart - lazy val secondSystem = ActorSystem(system.name, system.settings.config) + lazy val secondSystem = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) + + override def verifySystemShutdown: Boolean = true def seedNodes: immutable.IndexedSeq[Address] = Vector(first) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala index 266acd5406..eb5eb2fd3d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala @@ -76,7 +76,9 @@ abstract class RestartNodeSpec @volatile var secondUniqueAddress: UniqueAddress = _ // use a separate ActorSystem, to be able to simulate restart - lazy val secondSystem = ActorSystem(system.name, system.settings.config) + lazy val secondSystem = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) + + override def verifySystemShutdown: Boolean = true def seedNodes: immutable.IndexedSeq[Address] = Vector(first, secondUniqueAddress.address, third) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 175f79a8d5..cafad2ebd0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -458,6 +458,8 @@ abstract class StressSpec override def shutdownTimeout: FiniteDuration = 30.seconds.dilated + override def verifySystemShutdown: Boolean = true + override def muteLog(sys: ActorSystem = system): Unit = { super.muteLog(sys) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) @@ -782,7 +784,7 @@ abstract class StressSpec previousAS.foreach { as => TestKit.shutdownActorSystem(as) } - val sys = ActorSystem(system.name, system.settings.config) + val sys = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) muteLog(sys) Cluster(sys).joinSeedNodes(seedNodes.toIndexedSeq.map(address)) Some(sys) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala index 110fca945a..629e8604be 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala @@ -246,7 +246,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) "handle Update before load" in { runOn(first) { - val sys1 = ActorSystem("AdditionalSys", system.settings.config) + val sys1 = ActorSystem("AdditionalSys", MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) val address = Cluster(sys1).selfAddress try { Cluster(sys1).join(address) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala index cd64f80a2d..3d03bb6a42 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala @@ -75,7 +75,7 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN join(first, first) join(second, first) - val sys2 = ActorSystem(system.name, system.settings.config) + val sys2 = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config)) val cluster2 = Cluster(sys2) val distributedData2 = DistributedData(sys2) val replicator2 = startReplicator(sys2) diff --git a/akka-docs/src/main/paradox/multi-jvm-testing.md b/akka-docs/src/main/paradox/multi-jvm-testing.md index fea709fc25..09dd1b20e9 100644 --- a/akka-docs/src/main/paradox/multi-jvm-testing.md +++ b/akka-docs/src/main/paradox/multi-jvm-testing.md @@ -1,6 +1,7 @@ --- project.description: Multi JVM testing of distributed systems built with Akka. --- + # Multi JVM Testing Supports running applications (objects with main methods) and ScalaTest tests in multiple JVMs at the same time. @@ -11,20 +12,22 @@ Useful for integration testing where multiple systems communicate with each othe The multi-JVM testing is an sbt plugin that you can find at [https://github.com/sbt/sbt-multi-jvm](https://github.com/sbt/sbt-multi-jvm). To configure it in your project you should do the following steps: -1. Add it as a plugin by adding the following to your project/plugins.sbt: - - @@snip [plugins.sbt](/project/plugins.sbt) { #sbt-multi-jvm } - -2. Add multi-JVM testing to `build.sbt` or `project/Build.scala` by enabling `MultiJvmPlugin` and -setting the `MultiJvm` config. +1. Add it as a plugin by adding the following to your project/plugins.sbt: ```none - lazy val root = (project in file(".")) - .enablePlugins(MultiJvmPlugin) - .configs(MultiJvm) + addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0") ``` - -**Please note** that by default MultiJvm test sources are located in `src/multi-jvm/...`, + +2. Add multi-JVM testing to `build.sbt` or `project/Build.scala` by enabling `MultiJvmPlugin` and + setting the `MultiJvm` config. + + ```none + lazy val root = (project in file(".")) + .enablePlugins(MultiJvmPlugin) + .configs(MultiJvm) + ``` + +**Please note** that by default MultiJvm test sources are located in `src/multi-jvm/...`, and not in `src/test/...`. ## Running tests @@ -148,7 +151,7 @@ directory as the test. For example, to feed the JVM options `-Dakka.remote.port=9991` and `-Xmx256m` to the `SampleMultiJvmNode1` let's create three `*.opts` files and add the options to them. Separate multiple options with -space. +space. `SampleMultiJvmNode1.opts`: diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 955f6d4505..446de63d5f 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -170,6 +170,20 @@ object MultiNodeSpec { require(selfPort >= 0 && selfPort < 65535, "multinode.port is out of bounds: " + selfPort) + /** + * UDP Port number to be used on this node. 0 means a random port. + * + * {{{ + * -Dmultinode.udp-port=0 + * }}} + */ + val udpPort: Option[Int] = Option(System.getProperty("multinode.udp-port")) match { + case None => None + case Some(_) => Some(Integer.getInteger("multinode.udp-port", 0)) + } + + require(udpPort.getOrElse(1) >= 0 && udpPort.getOrElse(1) < 65535, "multinode.udp-port is out of bounds: " + udpPort) + /** * Name (or IP address; must be resolvable using InetAddress.getByName) * of the host that the server node is running on. @@ -243,6 +257,24 @@ object MultiNodeSpec { ConfigFactory.parseMap(map.asJava) } + // Multi node tests on kuberenetes require fixed ports to be mapped and exposed + // This method change the port bindings to avoid conflicts + // Please note that with the current setup only port 5000 and 5001 are exposed in kubernetes + def configureNextPortIfFixed(config: Config): Config = { + val arteryPortConfig = getNextPortString("akka.remote.artery.canonical.port", config) + val nettyPortConfig = getNextPortString("akka.remote.classic.netty.tcp.port", config) + ConfigFactory.parseString(s"""{ + $arteryPortConfig + $nettyPortConfig + }""").withFallback(config) + } + + private def getNextPortString(key: String, config: Config): String = { + val port = config.getInt(key) + if (port != 0) + s"""$key = ${port + 1}""" + else "" + } } /** diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala index 5659caba83..7f2bab04f2 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala @@ -79,8 +79,12 @@ abstract class AeronStreamConsistencySpec def channel(roleName: RoleName) = { val n = node(roleName) - system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort - val port = expectMsgType[Int] + val port = MultiNodeSpec.udpPort match { + case None => + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + expectMsgType[Int] + case Some(p) => p + } s"aeron:udp?endpoint=${n.address.host.get}:$port" } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala index 086f0f3933..2c7639de20 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala @@ -108,8 +108,13 @@ abstract class AeronStreamLatencySpec def channel(roleName: RoleName) = { val n = node(roleName) - system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort - val port = expectMsgType[Int] + + val port = MultiNodeSpec.udpPort match { + case None => + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + expectMsgType[Int] + case Some(p) => p + } s"aeron:udp?endpoint=${n.address.host.get}:$port" } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala index 253958350e..5f978a1d0f 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala @@ -108,8 +108,12 @@ abstract class AeronStreamMaxThroughputSpec def channel(roleName: RoleName) = { val n = node(roleName) - system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort - val port = expectMsgType[Int] + val port = MultiNodeSpec.udpPort match { + case None => + system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort + expectMsgType[Int] + case Some(p) => p + } s"aeron:udp?endpoint=${n.address.host.get}:$port" } diff --git a/build.sbt b/build.sbt index 10d18066e3..6bc3bcb344 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,7 @@ addCommandAlias(name = "sortImports", value = ";scalafixEnable; scalafixAll Sort import akka.AkkaBuild._ import akka.{ AkkaBuild, Dependencies, OSGi, Protobuf, SigarLoader, VersionGenerator } -import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm +import com.typesafe.sbt.MultiJvmPlugin.MultiJvmKeys.MultiJvm import com.typesafe.tools.mima.plugin.MimaPlugin import sbt.Keys.{ initialCommands, parallelExecution } import spray.boilerplate.BoilerplatePlugin diff --git a/kubernetes/.gitignore b/kubernetes/.gitignore new file mode 100644 index 0000000000..d36977dc47 --- /dev/null +++ b/kubernetes/.gitignore @@ -0,0 +1 @@ +.tmp diff --git a/kubernetes/create-cluster-gke.sh b/kubernetes/create-cluster-gke.sh new file mode 100755 index 0000000000..26d2bdd595 --- /dev/null +++ b/kubernetes/create-cluster-gke.sh @@ -0,0 +1,80 @@ +#!/bin/bash -e + +# Copyright (C) 2016-2020 Lightbend Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Usage: +# create-cluster-gke.sh [CLUSTER-NAME] [CLUSTER-VERSION] +if [ $# -eq 0 ] + then + echo "No cluster name supplied" + echo "Usage: create-cluster-gke.sh [CLUSTER-NAME] (Optional)[CLUSTER-VERSION]" + exit 1 +fi + +gcloudZone=$(gcloud config get-value compute/zone) +if [ "$gcloudZone" == "" ] + then + echo "No compute/zone set in your GCloud configuration" + echo "Please set a compute zone by running: gcloud config set compute/zone VALUE [optional flags]" + exit 1 +fi + +gcloudRegion=$(gcloud config get-value compute/region) +if [ "$gcloudRegion" == "" ] + then + echo "No compute/region set in your GCloud configuration" + echo "Please set a compute region by running: gcloud config set compute/region VALUE [optional flags]" + exit 1 +fi + +gcloudProject=$(gcloud config get-value project) +if [ "$gcloudProject" == "" ] + then + echo "No project set in your GCloud configuration" + echo "Please set a compute region by running: gcloud config set project VALUE" + exit 1 +fi + +CLUSTER_NAME=$1 +CLUSTER_VERSION=$2 + +if [ -z "$CLUSTER_VERSION" ] + then + # https://cloud.google.com/kubernetes-engine/versioning-and-upgrades#versions_available_for_new_cluster_masters + CLUSTER_VERSION=$(gcloud container get-server-config --format="value(defaultClusterVersion)") + echo "No cluster version specified. Using the default: $CLUSTER_VERSION" + else + echo "Cluster version: $CLUSTER_VERSION" +fi + +# Create cluster +gcloud container clusters create $CLUSTER_NAME \ + --cluster-version $CLUSTER_VERSION \ + --enable-ip-alias \ + --image-type cos \ + --machine-type n1-standard-4 \ + --num-nodes 5 \ + --no-enable-autoupgrade + + # --workload-pool=$gcloudProject.svc.id.goog # becoming default in next version, allows mapping of GCP service accounts to k8s service accounts + +## Wait for clusters to come up +echo "Waiting for cluster to become stable before continuing with the installation....." +gcloud compute instance-groups managed list --filter="name~gke-$CLUSTER_NAME" --format="value(name)" | while read -r line ; do + gcloud compute instance-groups managed wait-until --stable $line +done + +# Switch to new cluster +gcloud container clusters get-credentials $CLUSTER_NAME diff --git a/kubernetes/setup.sh b/kubernetes/setup.sh new file mode 100755 index 0000000000..e92777a3d9 --- /dev/null +++ b/kubernetes/setup.sh @@ -0,0 +1,21 @@ +#!/bin/bash +NUM_OF_NODES=$1 +DEST_HOST_FILE=$2 +TMP_DIR=.tmp + +kubectl delete deployments,services -l app=multi-node-test | true + +rm -rf ${DEST_HOST_FILE} +rm -rf ${TMP_DIR} +mkdir -p ${TMP_DIR} + +touch ${DEST_HOST_FILE} + +for i in `seq 1 "${NUM_OF_NODES}"`; +do + cat ./kubernetes/test-node-base.yaml | sed "s/test-nodeX/test-node${i}/" > ".tmp/test-node${i}.yml" + echo $i + echo "test-node${i}:/usr/local/openjdk-11/bin/java -Dmultinode.port=5000 -Dmultinode.udp-port=6000" >> ${DEST_HOST_FILE} +done + +kubectl apply -f ${TMP_DIR} diff --git a/kubernetes/test-node-base.yaml b/kubernetes/test-node-base.yaml new file mode 100644 index 0000000000..5506b3349f --- /dev/null +++ b/kubernetes/test-node-base.yaml @@ -0,0 +1,92 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: test-nodeX + labels: + app: multi-node-test +spec: + replicas: 1 + selector: + matchLabels: + host: test-nodeX + template: + metadata: + labels: + host: test-nodeX + spec: + containers: + - image: openjdk:11 + command: ["sleep", "infinity"] + resources: + requests: + memory: "2Gi" + cpu: "1" + limits: + memory: "2Gi" + lifecycle: + postStart: + exec: + command: + - /bin/sh + - -c + - truncate --size -1 /etc/hosts && echo " test-nodeX" >> /etc/hosts + imagePullPolicy: Always + name: multi-test-nodeX + volumeMounts: + - mountPath: /dev/shm + name: dshm + ports: + - name: web + containerPort: 80 + protocol: TCP + - name: ssh + containerPort: 22 + protocol: TCP + - name: multi-node + containerPort: 5000 + protocol: TCP + - name: multi-node2 + containerPort: 5001 + protocol: TCP + - name: multi-node-udp + containerPort: 6000 + protocol: UDP + - name: server-multi + containerPort: 4711 + protocol: TCP + volumes: + # Needed for Aeron tests: https://github.com/real-logic/aeron/blob/master/README.md#troubleshooting + - name: dshm + emptyDir: + medium: Memory +--- +apiVersion: v1 +kind: Service +metadata: + name: test-nodeX + labels: + app: multi-node-test +spec: + selector: + host: test-nodeX + ports: + - protocol: TCP + name: ssh + port: 22 + targetPort: 22 + - protocol: TCP + name: server-multi + port: 4711 + targetPort: 4711 + - protocol: TCP + name: multi-node + port: 5000 + targetPort: 5000 + - protocol: TCP + name: multi-node2 + port: 5001 + targetPort: 5001 + - protocol: UDP + name: multi-node-udp + port: 6000 + targetPort: 6000 diff --git a/project/Jvm.scala b/project/Jvm.scala new file mode 100644 index 0000000000..7ad089f6ce --- /dev/null +++ b/project/Jvm.scala @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package com.typesafe.sbt.multijvm + +import java.io.File +import java.lang.{ ProcessBuilder => JProcessBuilder } + +import sbt._ +import scala.sys.process.Process + +object Jvm { + def startJvm( + javaBin: File, + jvmOptions: Seq[String], + runOptions: Seq[String], + logger: Logger, + connectInput: Boolean) = { + forkJava(javaBin, jvmOptions ++ runOptions, logger, connectInput) + } + + def forkJava(javaBin: File, options: Seq[String], logger: Logger, connectInput: Boolean) = { + val java = javaBin.toString + val command = (java :: options.toList).toArray + val builder = new JProcessBuilder(command: _*) + Process(builder).run(logger, connectInput) + } + + /** + * check if the current operating system is some OS + **/ + def isOS(os: String) = + try { + System.getProperty("os.name").toUpperCase.startsWith(os.toUpperCase) + } catch { + case _: Throwable => false + } + + /** + * convert to proper path for the operating system + **/ + def osPath(path: String) = if (isOS("WINDOWS")) Process(Seq("cygpath", path)).lineStream.mkString else path + + def getPodName(hostAndUser: String, sbtLogger: Logger): String = { + val command: Array[String] = + Array("kubectl", "get", "pods", "-l", s"host=$hostAndUser", "--no-headers", "-o", "name") + val builder = new JProcessBuilder(command: _*) + sbtLogger.debug("Jvm.getPodName about to run " + command.mkString(" ")) + val podName = Process(builder).!! + sbtLogger.debug("Jvm.getPodName podName is " + podName) + podName.stripPrefix("pod/").stripSuffix("\n") + } + + def syncJar(jarName: String, hostAndUser: String, remoteDir: String, sbtLogger: Logger): Process = { + val podName = getPodName(hostAndUser, sbtLogger) + val command: Array[String] = + Array("kubectl", "exec", podName, "--", "/bin/bash", "-c", s"rm -rf $remoteDir && mkdir -p $remoteDir") + val builder = new JProcessBuilder(command: _*) + sbtLogger.debug("Jvm.syncJar about to run " + command.mkString(" ")) + val process = Process(builder).run(sbtLogger, false) + if (process.exitValue() == 0) { + val command: Array[String] = Array("kubectl", "cp", osPath(jarName), podName + ":" + remoteDir + "/") + val builder = new JProcessBuilder(command: _*) + sbtLogger.debug("Jvm.syncJar about to run " + command.mkString(" ")) + Process(builder).run(sbtLogger, false) + } else { + process + } + } + + def forkRemoteJava( + java: String, + jvmOptions: Seq[String], + appOptions: Seq[String], + jarName: String, + hostAndUser: String, + remoteDir: String, + logger: Logger, + connectInput: Boolean, + sbtLogger: Logger): Process = { + val podName = getPodName(hostAndUser, sbtLogger) + sbtLogger.debug("About to use java " + java) + val shortJarName = new File(jarName).getName + val javaCommand = List(List(java), jvmOptions, List("-cp", shortJarName), appOptions).flatten + val command = Array( + "kubectl", + "exec", + podName, + "--", + "/bin/bash", + "-c", + ("cd " :: (remoteDir :: (" ; " :: javaCommand))).mkString(" ")) + sbtLogger.debug("Jvm.forkRemoteJava about to run " + command.mkString(" ")) + val builder = new JProcessBuilder(command: _*) + Process(builder).run(logger, connectInput) + } +} + +class JvmBasicLogger(name: String) extends BasicLogger { + def jvm(message: String) = "[%s] %s".format(name, message) + + def log(level: Level.Value, message: => String) = System.out.synchronized { + System.out.println(jvm(message)) + } + + def trace(t: => Throwable) = System.out.synchronized { + val traceLevel = getTrace + if (traceLevel >= 0) System.out.print(StackTrace.trimmed(t, traceLevel)) + } + + def success(message: => String) = log(Level.Info, message) + def control(event: ControlEvent.Value, message: => String) = log(Level.Info, message) + + def logAll(events: Seq[LogEvent]) = System.out.synchronized { events.foreach(log) } +} + +final class JvmLogger(name: String) extends JvmBasicLogger(name) diff --git a/project/MultiNode.scala b/project/MultiNode.scala index 293a2da54c..ac3953eaaa 100644 --- a/project/MultiNode.scala +++ b/project/MultiNode.scala @@ -6,8 +6,8 @@ package akka import akka.TestExtras.Filter.Keys._ import com.typesafe.sbt.MultiJvmPlugin.MultiJvmKeys.multiJvmCreateLogger -import com.typesafe.sbt.SbtMultiJvm -import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys._ +import com.typesafe.sbt.{ MultiJvmPlugin => SbtMultiJvm } +import com.typesafe.sbt.MultiJvmPlugin.MultiJvmKeys._ import sbt.{ Def, _ } import sbt.Keys._ import de.heikoseeberger.sbtheader.HeaderPlugin.autoImport._ diff --git a/project/SbtMultiJvm.scala b/project/SbtMultiJvm.scala new file mode 100644 index 0000000000..7bc97bd51b --- /dev/null +++ b/project/SbtMultiJvm.scala @@ -0,0 +1,543 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package com.typesafe.sbt + +import com.typesafe.sbt.multijvm.{ Jvm, JvmLogger } +import scala.sys.process.Process +import sjsonnew.BasicJsonProtocol._ +import sbt._ +import Keys._ +import java.io.File +import java.lang.Boolean.getBoolean + +import scala.Console.{ GREEN, RESET } + +import sbtassembly.AssemblyPlugin.assemblySettings +import sbtassembly.{ AssemblyKeys, MergeStrategy } +import AssemblyKeys._ + +object MultiJvmPlugin extends AutoPlugin { + + case class Options(jvm: Seq[String], extra: String => Seq[String], run: String => Seq[String]) + + object MultiJvmKeys { + val MultiJvm = config("multi-jvm").extend(Test) + + val multiJvmMarker = SettingKey[String]("multi-jvm-marker") + + val multiJvmTests = TaskKey[Map[String, Seq[String]]]("multi-jvm-tests") + val multiJvmTestNames = TaskKey[Seq[String]]("multi-jvm-test-names") + + val multiJvmApps = TaskKey[Map[String, Seq[String]]]("multi-jvm-apps") + val multiJvmAppNames = TaskKey[Seq[String]]("multi-jvm-app-names") + + val multiJvmJavaCommand = TaskKey[File]("multi-jvm-java-command") + + val jvmOptions = TaskKey[Seq[String]]("jvm-options") // TODO: shouldn't that be regular `javaOptions`? + val extraOptions = SettingKey[String => Seq[String]]("extra-options") + val multiJvmCreateLogger = TaskKey[String => Logger]("multi-jvm-create-logger") + + val scalatestRunner = SettingKey[String]("scalatest-runner") + val scalatestOptions = SettingKey[Seq[String]]("scalatest-options") + val scalatestClasspath = TaskKey[Classpath]("scalatest-classpath") + val scalatestScalaOptions = TaskKey[String => Seq[String]]("scalatest-scala-options") + val scalatestMultiNodeScalaOptions = TaskKey[String => Seq[String]]("scalatest-multi-node-scala-options") + val multiTestOptions = TaskKey[Options]("multi-test-options") + val multiNodeTestOptions = TaskKey[Options]("multi-node-test-options") + + val appScalaOptions = TaskKey[String => Seq[String]]("app-scala-options") + val multiRunOptions = TaskKey[Options]("multi-run-options") + + val multiRunCopiedClassLocation = SettingKey[File]("multi-run-copied-class-location") + + val multiJvmTestJar = TaskKey[String]("multi-jvm-test-jar") + val multiJvmTestJarName = TaskKey[String]("multi-jvm-test-jar-name") + + val multiNodeTest = TaskKey[Unit]("multi-node-test") + val multiNodeExecuteTests = TaskKey[Tests.Output]("multi-node-execute-tests") + val multiNodeTestOnly = InputKey[Unit]("multi-node-test-only") + + val multiNodeHosts = SettingKey[Seq[String]]("multi-node-hosts") + val multiNodeHostsFileName = SettingKey[String]("multi-node-hosts-file-name") + val multiNodeProcessedHosts = TaskKey[(IndexedSeq[String], IndexedSeq[String])]("multi-node-processed-hosts") + val multiNodeTargetDirName = SettingKey[String]("multi-node-target-dir-name") + val multiNodeJavaName = SettingKey[String]("multi-node-java-name") + + // TODO fugly workaround for now + val multiNodeWorkAround = + TaskKey[(String, (IndexedSeq[String], IndexedSeq[String]), String)]("multi-node-workaround") + } + + val autoImport = MultiJvmKeys + + import MultiJvmKeys._ + + override def requires = plugins.JvmPlugin + + override def projectConfigurations = Seq(MultiJvm) + + override def projectSettings = multiJvmSettings + + private[this] def noTestsMessage(scoped: ScopedKey[_])(implicit display: Show[ScopedKey[_]]): String = + "No tests to run for " + display.show(scoped) + + lazy val multiJvmSettings: Seq[Def.Setting[_]] = + inConfig(MultiJvm)(Defaults.configSettings ++ internalMultiJvmSettings) + + // https://github.com/sbt/sbt/blob/v0.13.15/main/actions/src/main/scala/sbt/Tests.scala#L296-L298 + private[this] def showResults(log: Logger, results: Tests.Output, noTestsMessage: => String): Unit = + TestResultLogger.Default.copy(printNoTests = TestResultLogger.const(_.info(noTestsMessage))).run(log, results, "") + + private def internalMultiJvmSettings = + assemblySettings ++ Seq( + multiJvmMarker := "MultiJvm", + loadedTestFrameworks := (loadedTestFrameworks in Test).value, + definedTests := Defaults.detectTests.value, + multiJvmTests := collectMultiJvm(definedTests.value.map(_.name), multiJvmMarker.value), + multiJvmTestNames := multiJvmTests.map(_.keys.toSeq).storeAs(multiJvmTestNames).triggeredBy(compile).value, + multiJvmApps := collectMultiJvm(discoveredMainClasses.value, multiJvmMarker.value), + multiJvmAppNames := multiJvmApps.map(_.keys.toSeq).storeAs(multiJvmAppNames).triggeredBy(compile).value, + multiJvmJavaCommand := javaCommand(javaHome.value, "java"), + jvmOptions := Seq.empty, + extraOptions := { (name: String) => + Seq.empty + }, + multiJvmCreateLogger := { (name: String) => + new JvmLogger(name) + }, + scalatestRunner := "org.scalatest.tools.Runner", + scalatestOptions := defaultScalatestOptions, + scalatestClasspath := managedClasspath.value.filter(_.data.name.contains("scalatest")), + multiRunCopiedClassLocation := new File(target.value, "multi-run-copied-libraries"), + scalatestScalaOptions := scalaOptionsForScalatest( + scalatestRunner.value, + scalatestOptions.value, + fullClasspath.value, + multiRunCopiedClassLocation.value), + scalatestMultiNodeScalaOptions := scalaMultiNodeOptionsForScalatest( + scalatestRunner.value, + scalatestOptions.value), + multiTestOptions := Options(jvmOptions.value, extraOptions.value, scalatestScalaOptions.value), + multiNodeTestOptions := Options(jvmOptions.value, extraOptions.value, scalatestMultiNodeScalaOptions.value), + appScalaOptions := scalaOptionsForApps(fullClasspath.value), + connectInput := true, + multiRunOptions := Options(jvmOptions.value, extraOptions.value, appScalaOptions.value), + executeTests := multiJvmExecuteTests.value, + testOnly := multiJvmTestOnly.evaluated, + test := showResults(streams.value.log, executeTests.value, "No tests to run for MultiJvm"), + run := multiJvmRun.evaluated, + runMain := multiJvmRun.evaluated, + // TODO try to make sure that this is only generated on a need to have basis + multiJvmTestJar := (assemblyOutputPath in assembly).map(_.getAbsolutePath).dependsOn(assembly).value, + multiJvmTestJarName := (assemblyOutputPath in assembly).value.getAbsolutePath, + multiNodeTest := { + implicit val display = Project.showContextKey(state.value) + showResults(streams.value.log, multiNodeExecuteTests.value, noTestsMessage(resolvedScoped.value)) + }, + multiNodeExecuteTests := multiNodeExecuteTestsTask.value, + multiNodeTestOnly := multiNodeTestOnlyTask.evaluated, + multiNodeHosts := Seq.empty, + multiNodeHostsFileName := "multi-node-test.hosts", + multiNodeProcessedHosts := processMultiNodeHosts( + multiNodeHosts.value, + multiNodeHostsFileName.value, + multiNodeJavaName.value, + streams.value), + multiNodeTargetDirName := "multi-node-test", + multiNodeJavaName := "java", + // TODO there must be a way get at keys in the tasks that I just don't get + multiNodeWorkAround := (multiJvmTestJar.value, multiNodeProcessedHosts.value, multiNodeTargetDirName.value), + // here follows the assembly parts of the config + // don't run the tests when creating the assembly + test in assembly := {}, + // we want everything including the tests and test frameworks + fullClasspath in assembly := (fullClasspath in MultiJvm).value, + // the first class wins just like a classpath + // just concatenate conflicting text files + assemblyMergeStrategy in assembly := { + case n if n.endsWith(".class") => MergeStrategy.first + case n if n.endsWith(".txt") => MergeStrategy.concat + case n if n.endsWith("NOTICE") => MergeStrategy.concat + case n => (assemblyMergeStrategy in assembly).value.apply(n) + }, + assemblyJarName in assembly := { + name.value + "_" + scalaVersion.value + "-" + version.value + "-multi-jvm-assembly.jar" + }) + + def collectMultiJvm(discovered: Seq[String], marker: String): Map[String, Seq[String]] = { + val found = discovered.filter(_.contains(marker)).groupBy(multiName(_, marker)) + found.map { + case (key, values) => + val totalNodes = sys.props.get(marker + "." + key + ".nrOfNodes").getOrElse(values.size.toString).toInt + val sortedClasses = values.sorted + val totalClasses = sortedClasses.padTo(totalNodes, sortedClasses.last) + (key, totalClasses) + } + } + + def multiName(name: String, marker: String) = name.split(marker).head + + def multiSimpleName(name: String) = name.split("\\.").last + + def javaCommand(javaHome: Option[File], name: String): File = { + val home = javaHome.getOrElse(new File(System.getProperty("java.home"))) + new File(new File(home, "bin"), name) + } + + def defaultScalatestOptions: Seq[String] = { + if (getBoolean("sbt.log.noformat")) Seq("-oW") else Seq("-o") + } + + def scalaOptionsForScalatest( + runner: String, + options: Seq[String], + fullClasspath: Classpath, + multiRunCopiedClassDir: File) = { + val directoryBasedClasspathEntries = fullClasspath.files.filter(_.isDirectory) + // Copy over just the jars to this folder. + fullClasspath.files + .filter(_.isFile) + .foreach(classpathFile => + IO.copyFile(classpathFile, new File(multiRunCopiedClassDir, classpathFile.getName), true)) + val cp = directoryBasedClasspathEntries.absString + File.pathSeparator + multiRunCopiedClassDir.getAbsolutePath + File.separator + "*" + (testClass: String) => { Seq("-cp", cp, runner, "-s", testClass) ++ options } + } + + def scalaMultiNodeOptionsForScalatest(runner: String, options: Seq[String]) = { (testClass: String) => + { Seq(runner, "-s", testClass) ++ options } + } + + def scalaOptionsForApps(classpath: Classpath) = { + val cp = classpath.files.absString + (mainClass: String) => Seq("-cp", cp, mainClass) + } + + def multiJvmExecuteTests: Def.Initialize[sbt.Task[Tests.Output]] = Def.task { + runMultiJvmTests( + multiJvmTests.value, + multiJvmMarker.value, + multiJvmJavaCommand.value, + multiTestOptions.value, + sourceDirectory.value, + multiJvmCreateLogger.value, + streams.value.log) + } + + def multiJvmTestOnly: Def.Initialize[sbt.InputTask[Unit]] = + InputTask.createDyn(loadForParser(multiJvmTestNames)((s, i) => Defaults.testOnlyParser(s, i.getOrElse(Nil)))) { + Def.task { + case (selection, _extraOptions) => + val s = streams.value + val options = multiTestOptions.value + val opts = options.copy(extra = (s: String) => { options.extra(s) ++ _extraOptions }) + val filters = selection.map(GlobFilter(_)) + val tests = multiJvmTests.value.filterKeys(name => filters.exists(_.accept(name))) + Def.task { + val results = runMultiJvmTests( + tests, + multiJvmMarker.value, + multiJvmJavaCommand.value, + opts, + sourceDirectory.value, + multiJvmCreateLogger.value, + s.log) + showResults(s.log, results, "No tests to run for MultiJvm") + } + } + } + + def runMultiJvmTests( + tests: Map[String, Seq[String]], + marker: String, + javaBin: File, + options: Options, + srcDir: File, + createLogger: String => Logger, + log: Logger): Tests.Output = { + val results = + if (tests.isEmpty) + List() + else + tests.map { + case (_name, classes) => multi(_name, classes, marker, javaBin, options, srcDir, false, createLogger, log) + } + Tests.Output( + Tests.overall(results.map(_._2)), + Map.empty, + results.map(result => Tests.Summary("multi-jvm", result._1))) + } + + def multiJvmRun: Def.Initialize[sbt.InputTask[Unit]] = + InputTask.createDyn(loadForParser(multiJvmAppNames)((s, i) => runParser(s, i.getOrElse(Nil)))) { + Def.task { + val s = streams.value + val apps = multiJvmApps.value + val j = multiJvmJavaCommand.value + val c = connectInput.value + val dir = sourceDirectory.value + val options = multiRunOptions.value + val marker = multiJvmMarker.value + val createLogger = multiJvmCreateLogger.value + + result => { + val classes = apps.getOrElse(result, Seq.empty) + Def.task { + if (classes.isEmpty) s.log.info("No apps to run.") + else multi(result, classes, marker, j, options, dir, c, createLogger, s.log) + } + } + } + } + + def runParser: (State, Seq[String]) => complete.Parser[String] = { + import complete.DefaultParsers._ + (state, appClasses) => Space ~> token(NotSpace.examples(appClasses.toSet)) + } + + def multi( + name: String, + classes: Seq[String], + marker: String, + javaBin: File, + options: Options, + srcDir: File, + input: Boolean, + createLogger: String => Logger, + log: Logger): (String, sbt.TestResult) = { + val logName = "* " + name + log.info(if (log.ansiCodesSupported) GREEN + logName + RESET else logName) + val classesHostsJavas = getClassesHostsJavas(classes, IndexedSeq.empty, IndexedSeq.empty, "") + val hosts = classesHostsJavas.map(_._2) + val processes = classes.zipWithIndex.map { + case (testClass, index) => + val className = multiSimpleName(testClass) + val jvmName = "JVM-" + (index + 1) + "-" + className + val jvmLogger = createLogger(jvmName) + val optionsFile = (srcDir ** (className + ".opts")).get.headOption + val optionsFromFile = + optionsFile.map(IO.read(_)).map(_.trim.replace("\\n", " ").split("\\s+").toList).getOrElse(Seq.empty[String]) + val multiNodeOptions = getMultiNodeCommandLineOptions(hosts, index, classes.size) + val allJvmOptions = options.jvm ++ multiNodeOptions ++ optionsFromFile ++ options.extra(className) + val runOptions = options.run(testClass) + val connectInput = input && index == 0 + log.debug("Starting %s for %s".format(jvmName, testClass)) + log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" "))) + (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions, jvmLogger, connectInput)) + } + processExitCodes(name, processes, log) + } + + def processExitCodes(name: String, processes: Seq[(String, Process)], log: Logger): (String, sbt.TestResult) = { + val exitCodes = processes.map { + case (testClass, process) => (testClass, process.exitValue()) + } + val failures = exitCodes.flatMap { + case (testClass, exit) if exit > 0 => Some("Failed: " + testClass) + case _ => None + } + failures.foreach(log.error(_)) + (name, if (failures.nonEmpty) TestResult.Failed else TestResult.Passed) + } + + def multiNodeExecuteTestsTask: Def.Initialize[sbt.Task[Tests.Output]] = Def.task { + val (_jarName, (hostsAndUsers, javas), targetDir) = multiNodeWorkAround.value + runMultiNodeTests( + multiJvmTests.value, + multiJvmMarker.value, + multiNodeJavaName.value, + multiNodeTestOptions.value, + sourceDirectory.value, + _jarName, + hostsAndUsers, + javas, + targetDir, + multiJvmCreateLogger.value, + streams.value.log) + } + + def multiNodeTestOnlyTask: Def.Initialize[InputTask[Unit]] = + InputTask.createDyn(loadForParser(multiJvmTestNames)((s, i) => Defaults.testOnlyParser(s, i.getOrElse(Nil)))) { + Def.task { + case (selected, _extraOptions) => + val options = multiNodeTestOptions.value + val (_jarName, (hostsAndUsers, javas), targetDir) = multiNodeWorkAround.value + val s = streams.value + val opts = options.copy(extra = (s: String) => { options.extra(s) ++ _extraOptions }) + val tests = selected.flatMap { name => + multiJvmTests.value.get(name).map((name, _)) + } + Def.task { + val results = runMultiNodeTests( + tests.toMap, + multiJvmMarker.value, + multiNodeJavaName.value, + opts, + sourceDirectory.value, + _jarName, + hostsAndUsers, + javas, + targetDir, + multiJvmCreateLogger.value, + s.log) + showResults(s.log, results, "No tests to run for MultiNode") + } + } + } + + def runMultiNodeTests( + tests: Map[String, Seq[String]], + marker: String, + java: String, + options: Options, + srcDir: File, + jarName: String, + hostsAndUsers: IndexedSeq[String], + javas: IndexedSeq[String], + targetDir: String, + createLogger: String => Logger, + log: Logger): Tests.Output = { + val results = + if (tests.isEmpty) + List() + else + tests.map { + case (_name, classes) => + multiNode( + _name, + classes, + marker, + java, + options, + srcDir, + false, + jarName, + hostsAndUsers, + javas, + targetDir, + createLogger, + log) + } + Tests.Output( + Tests.overall(results.map(_._2)), + Map.empty, + results.map(result => Tests.Summary("multi-jvm", result._1))) + } + + def multiNode( + name: String, + classes: Seq[String], + marker: String, + defaultJava: String, + options: Options, + srcDir: File, + input: Boolean, + testJar: String, + hostsAndUsers: IndexedSeq[String], + javas: IndexedSeq[String], + targetDir: String, + createLogger: String => Logger, + log: Logger): (String, sbt.TestResult) = { + val logName = "* " + name + log.info(if (log.ansiCodesSupported) GREEN + logName + RESET else logName) + val classesHostsJavas = getClassesHostsJavas(classes, hostsAndUsers, javas, defaultJava) + val hosts = classesHostsJavas.map(_._2) + // TODO move this out, maybe to the hosts string as well? + val syncProcesses = classesHostsJavas.map { + case ((testClass, hostAndUser, java)) => + (testClass + " sync", Jvm.syncJar(testJar, hostAndUser, targetDir, log)) + } + val syncResult = processExitCodes(name, syncProcesses, log) + if (syncResult._2 == TestResult.Passed) { + val processes = classesHostsJavas.zipWithIndex.map { + case ((testClass, hostAndUser, java), index) => { + val jvmName = "JVM-" + (index + 1) + val jvmLogger = createLogger(jvmName) + val className = multiSimpleName(testClass) + val optionsFile = (srcDir ** (className + ".opts")).get.headOption + val optionsFromFile = optionsFile + .map(IO.read(_)) + .map(_.trim.replace("\\n", " ").split("\\s+").toList) + .getOrElse(Seq.empty[String]) + val multiNodeOptions = getMultiNodeCommandLineOptions(hosts, index, classes.size) + val allJvmOptions = options.jvm ++ optionsFromFile ++ options.extra(className) ++ multiNodeOptions + val runOptions = options.run(testClass) + val connectInput = input && index == 0 + log.debug("Starting %s for %s".format(jvmName, testClass)) + log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" "))) + ( + testClass, + Jvm.forkRemoteJava( + java, + allJvmOptions, + runOptions, + testJar, + hostAndUser, + targetDir, + jvmLogger, + connectInput, + log)) + } + } + processExitCodes(name, processes, log) + } else { + syncResult + } + } + + private def padSeqOrDefaultTo(seq: IndexedSeq[String], default: String, max: Int): IndexedSeq[String] = { + val realSeq = if (seq.isEmpty) IndexedSeq(default) else seq + if (realSeq.size >= max) + realSeq + else + (realSeq /: (0 until (max - realSeq.size)))((mySeq, pos) => mySeq :+ realSeq(pos % realSeq.size)) + } + + private def getClassesHostsJavas( + classes: Seq[String], + hostsAndUsers: IndexedSeq[String], + javas: IndexedSeq[String], + defaultJava: String): IndexedSeq[(String, String, String)] = { + val max = classes.length + val tuple = ( + classes.toIndexedSeq, + padSeqOrDefaultTo(hostsAndUsers, "localhost", max), + padSeqOrDefaultTo(javas, defaultJava, max)) + tuple.zipped.map { case (className: String, hostAndUser: String, _java: String) => (className, hostAndUser, _java) } + } + + private def getMultiNodeCommandLineOptions(hosts: Seq[String], index: Int, maxNodes: Int): Seq[String] = { + Seq( + "-Dmultinode.max-nodes=" + maxNodes, + "-Dmultinode.server-host=" + hosts(0).split("@").last, + "-Dmultinode.host=" + hosts(index).split("@").last, + "-Dmultinode.index=" + index) + } + + private def processMultiNodeHosts( + hosts: Seq[String], + hostsFileName: String, + defaultJava: String, + s: Types.Id[Keys.TaskStreams]): (IndexedSeq[String], IndexedSeq[String]) = { + val hostsFile = new File(hostsFileName) + val theHosts: IndexedSeq[String] = + if (hosts.isEmpty) { + if (hostsFile.exists && hostsFile.canRead) { + s.log.info("Using hosts defined in file " + hostsFile.getAbsolutePath) + IO.readLines(hostsFile).map(_.trim).filter(_.length > 0).toIndexedSeq + } else + hosts.toIndexedSeq + } else { + if (hostsFile.exists && hostsFile.canRead) + s.log.info( + "Hosts from setting " + multiNodeHosts.key.label + " is overrriding file " + hostsFile.getAbsolutePath) + hosts.toIndexedSeq + } + + theHosts.map { x => + val elems = x.split(":").toList.take(2).padTo(2, defaultJava) + (elems(0), elems(1)) + } unzip + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt index fa8f782e26..0c0ef1eb7b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,11 +3,6 @@ libraryDependencies += Defaults.sbtPluginExtra( (pluginCrossBuild / sbtBinaryVersion).value, (pluginCrossBuild / scalaBinaryVersion).value) -// these comment markers are for including code into the docs -//#sbt-multi-jvm -addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0") -//#sbt-multi-jvm - addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.6.0") addSbtPlugin("com.lightbend.sbt" % "sbt-bill-of-materials" % "1.0.2") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")