Run multi-node tests on fresh GKE clusters (#30570)

* Port multi-node-test from Jenkins
* link to issues for gh-excluded tests
* use kubectl in multi-jvm plugin
This commit is contained in:
Andrea Peruffo 2021-08-27 16:40:51 +01:00 committed by GitHub
parent 210912e916
commit d75ad252b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 1087 additions and 66 deletions

80
.github/workflows/multi-node.yml vendored Normal file
View file

@ -0,0 +1,80 @@
name: Multi node test
on:
schedule:
- cron: '0 0 * * *'
concurrency:
# Only run once for latest commit per ref and cancel other (previous) runs.
group: ci-multi-node-${{ github.ref }}
cancel-in-progress: true
jobs:
run-multi-node-tests:
name: Multi Node Test
runs-on: ubuntu-20.04
if: github.repository == 'akka/akka'
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Install Kubectl
run: |
sudo snap install kubectl --classic
- uses: google-github-actions/setup-gcloud@v0.2
with:
service_account_key: ${{ secrets.GKE_SA_KEY }}
project_id: ${{ secrets.GKE_PROJECT }}
- name: Create the cluster
run: |-
gcloud config set compute/region us-central1
gcloud config set compute/zone us-central1-c
./kubernetes/create-cluster-gke.sh "akka-multi-node-${GITHUB_RUN_ID}"
- name: Setup Pods
run: |
# Stress tests are using 13 nodes
./kubernetes/setup.sh 15 multi-node-test.hosts
- name: Set up JDK 11
uses: olafurpg/setup-scala@v10
with:
java-version: adopt@1.11.0-9
- name: Cache Coursier cache
uses: coursier/cache-action@v6.2
- name: Multi node test
run: |
cat multi-node-test.hosts
sbt -jvm-opts .jvmopts-ci \
-Dakka.test.timefactor=2 \
-Dakka.cluster.assert=on \
-Dsbt.override.build.repos=false \
-Dakka.test.tags.exclude=gh-exclude \
-Dakka.test.multi-node=true \
-Dakka.test.multi-node.targetDirName=${PWD}/target/${{ github.run_id }} \
-Dakka.test.multi-node.java=${JAVA_HOME}/bin/java \
-Dmultinode.XX:MetaspaceSize=128M \
-Dmultinode.Xms512M \
-Dmultinode.Xmx1536G \
-Dmultinode.Xlog:gc \
multiNodeTest
- name: Email on failure
if: ${{ failure() }}
uses: dawidd6/action-send-mail@v3
with:
server_address: smtp.gmail.com
server_port: 465
# Using port 465 already sets `secure: true`
secure: true
username: ${{secrets.MAIL_USERNAME}}
password: ${{secrets.MAIL_PASSWORD}}
subject: Multi node test (Akka)
to: akka.official@gmail.com
from: Akka CI (GHActions)
body: |
Multi node test of ${{github.repository}} failed!
https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
- name: Cleanup the environment
if: ${{ always() }}
shell: bash {0}
run: |
gcloud container clusters delete "akka-multi-node-${GITHUB_RUN_ID}" --quiet

6
.gitignore vendored
View file

@ -90,8 +90,14 @@ test-output
factorials.txt factorials.txt
factorial2.txt factorial2.txt
all-projects.txt
multi-node-projects.txt
/multi-node-test.hosts
# Default sigar library extract location. # Default sigar library extract location.
native/ native/
/dumps/ /dumps/
/core /core
.tmp

View file

@ -1,6 +1,13 @@
# This is used to configure the sbt instance that github actions launches # This is used to configure the sbt instance in CI
-Xms2G -XX:+UseG1GC
-Xmx2G -Xms3G
-Xmx3G
-Xss2M -Xss2M
-XX:ReservedCodeCacheSize=256m -XX:ReservedCodeCacheSize=256m
-XX:MaxGCPauseMillis=750
-XX:-UseBiasedLocking
-XX:+UseCompressedOops
-XX:MetaspaceSize=512M
-XX:-ClassUnloadingWithConcurrentMark
-Djava.security.egd=file:/dev/./urandom

View file

@ -6,6 +6,7 @@ package akka.cluster.metrics
import java.lang.management.ManagementFactory import java.lang.management.ManagementFactory
import scala.annotation.nowarn
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -25,7 +26,7 @@ import akka.routing.FromConfig
import akka.routing.GetRoutees import akka.routing.GetRoutees
import akka.routing.Routees import akka.routing.Routees
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.testkit.{ DefaultTimeout, ImplicitSender, LongRunningTest } import akka.testkit.{ DefaultTimeout, GHExcludeTest, ImplicitSender, LongRunningTest }
import akka.util.unused import akka.util.unused
object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig { object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig {
@ -118,6 +119,7 @@ class AdaptiveLoadBalancingRouterMultiJvmNode1 extends AdaptiveLoadBalancingRout
class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec
@nowarn
abstract class AdaptiveLoadBalancingRouterSpec abstract class AdaptiveLoadBalancingRouterSpec
extends MultiNodeSpec(AdaptiveLoadBalancingRouterConfig) extends MultiNodeSpec(AdaptiveLoadBalancingRouterConfig)
with MultiNodeClusterSpec with MultiNodeClusterSpec
@ -170,7 +172,8 @@ abstract class AdaptiveLoadBalancingRouterSpec
enterBarrier("after-1") enterBarrier("after-1")
} }
"use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"use all nodes in the cluster when not overloaded" taggedAs (LongRunningTest, GHExcludeTest) in {
runOn(node1) { runOn(node1) {
val router1 = startRouter("router1") val router1 = startRouter("router1")
@ -196,7 +199,8 @@ abstract class AdaptiveLoadBalancingRouterSpec
enterBarrier("after-2") enterBarrier("after-2")
} }
"prefer node with more free heap capacity" taggedAs LongRunningTest in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"prefer node with more free heap capacity" taggedAs (LongRunningTest, GHExcludeTest) in {
System.gc() System.gc()
enterBarrier("gc") enterBarrier("gc")
@ -229,7 +233,8 @@ abstract class AdaptiveLoadBalancingRouterSpec
enterBarrier("after-3") enterBarrier("after-3")
} }
"create routees from configuration" taggedAs LongRunningTest in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"create routees from configuration" taggedAs (LongRunningTest, GHExcludeTest) in {
runOn(node1) { runOn(node1) {
val router3 = system.actorOf(FromConfig.props(Props[Memory]()), "router3") val router3 = system.actorOf(FromConfig.props(Props[Memory]()), "router3")
// it may take some time until router receives cluster member events // it may take some time until router receives cluster member events
@ -240,7 +245,8 @@ abstract class AdaptiveLoadBalancingRouterSpec
enterBarrier("after-4") enterBarrier("after-4")
} }
"create routees from cluster.enabled configuration" taggedAs LongRunningTest in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"create routees from cluster.enabled configuration" taggedAs (LongRunningTest, GHExcludeTest) in {
runOn(node1) { runOn(node1) {
val router4 = system.actorOf(FromConfig.props(Props[Memory]()), "router4") val router4 = system.actorOf(FromConfig.props(Props[Memory]()), "router4")
// it may take some time until router receives cluster member events // it may take some time until router receives cluster member events

View file

@ -177,7 +177,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec
enterBarrier(s"after-start-stop-${testRun}") enterBarrier(s"after-start-stop-${testRun}")
} }
"test when starting new entity" in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"test when starting new entity" taggedAs GHExcludeTest in {
val numberOfMessages = 200 * NrOfMessagesFactor val numberOfMessages = 200 * NrOfMessagesFactor
runBench("start new entities") { (iteration, region, histogram) => runBench("start new entities") { (iteration, region, histogram) =>
(1 to numberOfMessages).foreach { n => (1 to numberOfMessages).foreach { n =>
@ -190,7 +191,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec
} }
} }
"test latency when starting new entity and sending a few messages" in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"test latency when starting new entity and sending a few messages" taggedAs GHExcludeTest in {
val numberOfMessages = 800 * NrOfMessagesFactor val numberOfMessages = 800 * NrOfMessagesFactor
runBench("start, few messages") { (iteration, region, histogram) => runBench("start, few messages") { (iteration, region, histogram) =>
for (n <- 1 to numberOfMessages / 5; _ <- 1 to 5) { for (n <- 1 to numberOfMessages / 5; _ <- 1 to 5) {
@ -203,7 +205,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec
} }
} }
"test latency when starting new entity and sending a few messages to it and stopping" in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"test latency when starting new entity and sending a few messages to it and stopping" taggedAs GHExcludeTest in {
val numberOfMessages = 800 * NrOfMessagesFactor val numberOfMessages = 800 * NrOfMessagesFactor
// 160 entities, and an extra one for the intialization // 160 entities, and an extra one for the intialization
// all but the first one are not removed // all but the first one are not removed
@ -237,7 +240,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec
} }
} }
"test latency when starting, few messages, stopping, few messages" in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"test latency when starting, few messages, stopping, few messages" taggedAs GHExcludeTest in {
val numberOfMessages = 800 * NrOfMessagesFactor val numberOfMessages = 800 * NrOfMessagesFactor
runBench("start, few messages, stop, few messages") { (iteration, region, histogram) => runBench("start, few messages, stop, few messages") { (iteration, region, histogram) =>
for (n <- 1 to numberOfMessages / 5; m <- 1 to 5) { for (n <- 1 to numberOfMessages / 5; m <- 1 to 5) {
@ -260,7 +264,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec
} }
} }
"test when starting some new entities mixed with sending to started" in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"test when starting some new entities mixed with sending to started" taggedAs GHExcludeTest in {
runBench("starting mixed with sending to started") { (iteration, region, histogram) => runBench("starting mixed with sending to started") { (iteration, region, histogram) =>
val numberOfMessages = 1600 * NrOfMessagesFactor val numberOfMessages = 1600 * NrOfMessagesFactor
(1 to numberOfMessages).foreach { n => (1 to numberOfMessages).foreach { n =>
@ -284,7 +289,8 @@ abstract class ClusterShardingRememberEntitiesPerfSpec
} }
} }
"test sending to started" in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"test sending to started" taggedAs GHExcludeTest in {
runBench("sending to started") { (iteration, region, histogram) => runBench("sending to started") { (iteration, region, histogram) =>
val numberOfMessages = 1600 * NrOfMessagesFactor val numberOfMessages = 1600 * NrOfMessagesFactor
(1 to numberOfMessages).foreach { n => (1 to numberOfMessages).foreach { n =>

View file

@ -21,6 +21,7 @@ import akka.cluster.typed.PrepareForFullClusterShutdown
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.testkit.GHExcludeTest
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -78,7 +79,8 @@ class ClusterShardingPreparingForShutdownSpec
formCluster(first, second, third) formCluster(first, second, third)
} }
"not rebalance but should still work preparing for shutdown" in { // Excluded on GH Actions: https://github.com/akka/akka/issues/30486
"not rebalance but should still work preparing for shutdown" taggedAs GHExcludeTest in {
val shardRegion: ActorRef[ShardingEnvelope[Command]] = val shardRegion: ActorRef[ShardingEnvelope[Command]] =
sharding.init(Entity(typeKey)(_ => Pinger())) sharding.init(Entity(typeKey)(_ => Pinger()))

View file

@ -130,8 +130,7 @@ class RandomizedSplitBrainResolverIntegrationSpec
c += 1 c += 1
val sys: ActorSystem = { val sys: ActorSystem = {
val sys = ActorSystem(system.name + "-" + c, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
val sys = ActorSystem(system.name + "-" + c, system.settings.config)
val gremlinController = sys.actorOf(GremlinController.props, "gremlinController") val gremlinController = sys.actorOf(GremlinController.props, "gremlinController")
system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c") system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c")
sys sys

View file

@ -120,9 +120,10 @@ class SplitBrainResolverIntegrationSpec
val sys = ActorSystem( val sys = ActorSystem(
system.name + "-" + c, system.name + "-" + c,
scenario.cfg MultiNodeSpec.configureNextPortIfFixed(
.withValue("akka.cluster.multi-data-center.self-data-center", ConfigValueFactory.fromAnyRef(dcName)) scenario.cfg
.withFallback(system.settings.config)) .withValue("akka.cluster.multi-data-center.self-data-center", ConfigValueFactory.fromAnyRef(dcName))
.withFallback(system.settings.config)))
val gremlinController = sys.actorOf(GremlinController.props, "gremlinController") val gremlinController = sys.actorOf(GremlinController.props, "gremlinController")
system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c") system.actorOf(GremlinControllerProxy.props(gremlinController), s"gremlinControllerProxy-$c")
sys sys

View file

@ -11,6 +11,7 @@ import com.typesafe.config.ConfigFactory
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, MemberStatus } import akka.cluster.{ Cluster, MemberStatus }
import akka.testkit._ import akka.testkit._
import akka.remote.testkit.MultiNodeSpec
import akka.util.ccompat._ import akka.util.ccompat._
@ccompatUsedUntil213 @ccompatUsedUntil213
@ -200,7 +201,7 @@ abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShard
} }
// no nodes left of the original cluster, start a new cluster // no nodes left of the original cluster, start a new cluster
val sys2 = ActorSystem(system.name, system.settings.config) val sys2 = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
val entityProbe2 = TestProbe()(sys2) val entityProbe2 = TestProbe()(sys2)
val probe2 = TestProbe()(sys2) val probe2 = TestProbe()(sys2)

View file

@ -46,6 +46,7 @@ abstract class QuickRestartSpec
val rounds = 3 val rounds = 3
override def verifySystemShutdown: Boolean = true
override def expectedTestDuration: FiniteDuration = 45.seconds * rounds override def expectedTestDuration: FiniteDuration = 45.seconds * rounds
"Quickly restarting node" must { "Quickly restarting node" must {
@ -61,20 +62,21 @@ abstract class QuickRestartSpec
for (n <- 1 to rounds) { for (n <- 1 to rounds) {
log.info("round-" + n) log.info("round-" + n)
runOn(second) { runOn(second) {
restartingSystem = restartingSystem = if (restartingSystem == null) {
if (restartingSystem == null) ActorSystem(
ActorSystem( system.name,
system.name, MultiNodeSpec.configureNextPortIfFixed(
ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]").withFallback(system.settings.config)) ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]").withFallback(system.settings.config)))
else } else {
ActorSystem( ActorSystem(
system.name, system.name,
// use the same port // use the same port
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
akka.cluster.roles = [round-$n] akka.cluster.roles = [round-$n]
akka.remote.classic.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get} akka.remote.classic.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}
akka.remote.artery.canonical.port = ${Cluster(restartingSystem).selfAddress.port.get} akka.remote.artery.canonical.port = ${Cluster(restartingSystem).selfAddress.port.get}
""").withFallback(system.settings.config)) """).withFallback(system.settings.config))
}
log.info("Restarting node has address: {}", Cluster(restartingSystem).selfUniqueAddress) log.info("Restarting node has address: {}", Cluster(restartingSystem).selfUniqueAddress)
Cluster(restartingSystem).joinSeedNodes(seedNodes) Cluster(restartingSystem).joinSeedNodes(seedNodes)
within(20.seconds) { within(20.seconds) {

View file

@ -26,12 +26,14 @@ class ClusterRemoteFeaturesConfig(artery: Boolean) extends MultiNodeConfig {
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
private val baseConfig = ConfigFactory.parseString(s""" private val baseConfig = {
ConfigFactory.parseString(s"""
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.remote.artery.enabled = $artery akka.remote.artery.enabled = $artery
akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.port = ${MultiNodeSpec.selfPort}
akka.log-dead-letters-during-shutdown = off akka.log-dead-letters-during-shutdown = off
""").withFallback(MultiNodeClusterSpec.clusterConfig) """).withFallback(MultiNodeClusterSpec.clusterConfig)
}
commonConfig(debugConfig(on = false).withFallback(baseConfig)) commonConfig(debugConfig(on = false).withFallback(baseConfig))

View file

@ -52,7 +52,9 @@ abstract class RestartFirstSeedNodeSpec
@volatile var seedNode1Address: Address = _ @volatile var seedNode1Address: Address = _
// use a separate ActorSystem, to be able to simulate restart // use a separate ActorSystem, to be able to simulate restart
lazy val seed1System = ActorSystem(system.name, system.settings.config) lazy val seed1System = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
override def verifySystemShutdown: Boolean = true
def missingSeed = address(seed3).copy(port = Some(61313)) def missingSeed = address(seed3).copy(port = Some(61313))
def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2, seed3, missingSeed) def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2, seed3, missingSeed)
@ -67,6 +69,7 @@ abstract class RestartFirstSeedNodeSpec
override def afterAll(): Unit = { override def afterAll(): Unit = {
runOn(seed1) { runOn(seed1) {
shutdown(if (seed1System.whenTerminated.isCompleted) restartedSeed1System else seed1System) shutdown(if (seed1System.whenTerminated.isCompleted) restartedSeed1System else seed1System)
} }
super.afterAll() super.afterAll()
} }

View file

@ -56,7 +56,9 @@ abstract class RestartNode2SpecSpec
@volatile var seedNode1Address: Address = _ @volatile var seedNode1Address: Address = _
// use a separate ActorSystem, to be able to simulate restart // use a separate ActorSystem, to be able to simulate restart
lazy val seed1System = ActorSystem(system.name, system.settings.config) lazy val seed1System = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
override def verifySystemShutdown: Boolean = true
def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2) def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2)

View file

@ -56,7 +56,9 @@ abstract class RestartNode3Spec
@volatile var secondUniqueAddress: UniqueAddress = _ @volatile var secondUniqueAddress: UniqueAddress = _
// use a separate ActorSystem, to be able to simulate restart // use a separate ActorSystem, to be able to simulate restart
lazy val secondSystem = ActorSystem(system.name, system.settings.config) lazy val secondSystem = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
override def verifySystemShutdown: Boolean = true
def seedNodes: immutable.IndexedSeq[Address] = Vector(first) def seedNodes: immutable.IndexedSeq[Address] = Vector(first)

View file

@ -76,7 +76,9 @@ abstract class RestartNodeSpec
@volatile var secondUniqueAddress: UniqueAddress = _ @volatile var secondUniqueAddress: UniqueAddress = _
// use a separate ActorSystem, to be able to simulate restart // use a separate ActorSystem, to be able to simulate restart
lazy val secondSystem = ActorSystem(system.name, system.settings.config) lazy val secondSystem = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
override def verifySystemShutdown: Boolean = true
def seedNodes: immutable.IndexedSeq[Address] = Vector(first, secondUniqueAddress.address, third) def seedNodes: immutable.IndexedSeq[Address] = Vector(first, secondUniqueAddress.address, third)

View file

@ -458,6 +458,8 @@ abstract class StressSpec
override def shutdownTimeout: FiniteDuration = 30.seconds.dilated override def shutdownTimeout: FiniteDuration = 30.seconds.dilated
override def verifySystemShutdown: Boolean = true
override def muteLog(sys: ActorSystem = system): Unit = { override def muteLog(sys: ActorSystem = system): Unit = {
super.muteLog(sys) super.muteLog(sys)
sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))
@ -782,7 +784,7 @@ abstract class StressSpec
previousAS.foreach { as => previousAS.foreach { as =>
TestKit.shutdownActorSystem(as) TestKit.shutdownActorSystem(as)
} }
val sys = ActorSystem(system.name, system.settings.config) val sys = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
muteLog(sys) muteLog(sys)
Cluster(sys).joinSeedNodes(seedNodes.toIndexedSeq.map(address)) Cluster(sys).joinSeedNodes(seedNodes.toIndexedSeq.map(address))
Some(sys) Some(sys)

View file

@ -246,7 +246,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
"handle Update before load" in { "handle Update before load" in {
runOn(first) { runOn(first) {
val sys1 = ActorSystem("AdditionalSys", system.settings.config) val sys1 = ActorSystem("AdditionalSys", MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
val address = Cluster(sys1).selfAddress val address = Cluster(sys1).selfAddress
try { try {
Cluster(sys1).join(address) Cluster(sys1).join(address)

View file

@ -75,7 +75,7 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
join(first, first) join(first, first)
join(second, first) join(second, first)
val sys2 = ActorSystem(system.name, system.settings.config) val sys2 = ActorSystem(system.name, MultiNodeSpec.configureNextPortIfFixed(system.settings.config))
val cluster2 = Cluster(sys2) val cluster2 = Cluster(sys2)
val distributedData2 = DistributedData(sys2) val distributedData2 = DistributedData(sys2)
val replicator2 = startReplicator(sys2) val replicator2 = startReplicator(sys2)

View file

@ -1,6 +1,7 @@
--- ---
project.description: Multi JVM testing of distributed systems built with Akka. project.description: Multi JVM testing of distributed systems built with Akka.
--- ---
# Multi JVM Testing # Multi JVM Testing
Supports running applications (objects with main methods) and ScalaTest tests in multiple JVMs at the same time. Supports running applications (objects with main methods) and ScalaTest tests in multiple JVMs at the same time.
@ -11,20 +12,22 @@ Useful for integration testing where multiple systems communicate with each othe
The multi-JVM testing is an sbt plugin that you can find at [https://github.com/sbt/sbt-multi-jvm](https://github.com/sbt/sbt-multi-jvm). The multi-JVM testing is an sbt plugin that you can find at [https://github.com/sbt/sbt-multi-jvm](https://github.com/sbt/sbt-multi-jvm).
To configure it in your project you should do the following steps: To configure it in your project you should do the following steps:
1. Add it as a plugin by adding the following to your project/plugins.sbt: 1. Add it as a plugin by adding the following to your project/plugins.sbt:
@@snip [plugins.sbt](/project/plugins.sbt) { #sbt-multi-jvm }
2. Add multi-JVM testing to `build.sbt` or `project/Build.scala` by enabling `MultiJvmPlugin` and
setting the `MultiJvm` config.
```none ```none
lazy val root = (project in file(".")) addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
.enablePlugins(MultiJvmPlugin)
.configs(MultiJvm)
``` ```
**Please note** that by default MultiJvm test sources are located in `src/multi-jvm/...`, 2. Add multi-JVM testing to `build.sbt` or `project/Build.scala` by enabling `MultiJvmPlugin` and
setting the `MultiJvm` config.
```none
lazy val root = (project in file("."))
.enablePlugins(MultiJvmPlugin)
.configs(MultiJvm)
```
**Please note** that by default MultiJvm test sources are located in `src/multi-jvm/...`,
and not in `src/test/...`. and not in `src/test/...`.
## Running tests ## Running tests
@ -148,7 +151,7 @@ directory as the test.
For example, to feed the JVM options `-Dakka.remote.port=9991` and `-Xmx256m` to the `SampleMultiJvmNode1` For example, to feed the JVM options `-Dakka.remote.port=9991` and `-Xmx256m` to the `SampleMultiJvmNode1`
let's create three `*.opts` files and add the options to them. Separate multiple options with let's create three `*.opts` files and add the options to them. Separate multiple options with
space. space.
`SampleMultiJvmNode1.opts`: `SampleMultiJvmNode1.opts`:

View file

@ -170,6 +170,20 @@ object MultiNodeSpec {
require(selfPort >= 0 && selfPort < 65535, "multinode.port is out of bounds: " + selfPort) require(selfPort >= 0 && selfPort < 65535, "multinode.port is out of bounds: " + selfPort)
/**
* UDP Port number to be used on this node. 0 means a random port.
*
* {{{
* -Dmultinode.udp-port=0
* }}}
*/
val udpPort: Option[Int] = Option(System.getProperty("multinode.udp-port")) match {
case None => None
case Some(_) => Some(Integer.getInteger("multinode.udp-port", 0))
}
require(udpPort.getOrElse(1) >= 0 && udpPort.getOrElse(1) < 65535, "multinode.udp-port is out of bounds: " + udpPort)
/** /**
* Name (or IP address; must be resolvable using InetAddress.getByName) * Name (or IP address; must be resolvable using InetAddress.getByName)
* of the host that the server node is running on. * of the host that the server node is running on.
@ -243,6 +257,24 @@ object MultiNodeSpec {
ConfigFactory.parseMap(map.asJava) ConfigFactory.parseMap(map.asJava)
} }
// Multi node tests on kuberenetes require fixed ports to be mapped and exposed
// This method change the port bindings to avoid conflicts
// Please note that with the current setup only port 5000 and 5001 are exposed in kubernetes
def configureNextPortIfFixed(config: Config): Config = {
val arteryPortConfig = getNextPortString("akka.remote.artery.canonical.port", config)
val nettyPortConfig = getNextPortString("akka.remote.classic.netty.tcp.port", config)
ConfigFactory.parseString(s"""{
$arteryPortConfig
$nettyPortConfig
}""").withFallback(config)
}
private def getNextPortString(key: String, config: Config): String = {
val port = config.getInt(key)
if (port != 0)
s"""$key = ${port + 1}"""
else ""
}
} }
/** /**

View file

@ -79,8 +79,12 @@ abstract class AeronStreamConsistencySpec
def channel(roleName: RoleName) = { def channel(roleName: RoleName) = {
val n = node(roleName) val n = node(roleName)
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort val port = MultiNodeSpec.udpPort match {
val port = expectMsgType[Int] case None =>
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
expectMsgType[Int]
case Some(p) => p
}
s"aeron:udp?endpoint=${n.address.host.get}:$port" s"aeron:udp?endpoint=${n.address.host.get}:$port"
} }

View file

@ -108,8 +108,13 @@ abstract class AeronStreamLatencySpec
def channel(roleName: RoleName) = { def channel(roleName: RoleName) = {
val n = node(roleName) val n = node(roleName)
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
val port = expectMsgType[Int] val port = MultiNodeSpec.udpPort match {
case None =>
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
expectMsgType[Int]
case Some(p) => p
}
s"aeron:udp?endpoint=${n.address.host.get}:$port" s"aeron:udp?endpoint=${n.address.host.get}:$port"
} }

View file

@ -108,8 +108,12 @@ abstract class AeronStreamMaxThroughputSpec
def channel(roleName: RoleName) = { def channel(roleName: RoleName) = {
val n = node(roleName) val n = node(roleName)
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort val port = MultiNodeSpec.udpPort match {
val port = expectMsgType[Int] case None =>
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
expectMsgType[Int]
case Some(p) => p
}
s"aeron:udp?endpoint=${n.address.host.get}:$port" s"aeron:udp?endpoint=${n.address.host.get}:$port"
} }

View file

@ -36,7 +36,7 @@ addCommandAlias(name = "sortImports", value = ";scalafixEnable; scalafixAll Sort
import akka.AkkaBuild._ import akka.AkkaBuild._
import akka.{ AkkaBuild, Dependencies, OSGi, Protobuf, SigarLoader, VersionGenerator } import akka.{ AkkaBuild, Dependencies, OSGi, Protobuf, SigarLoader, VersionGenerator }
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm import com.typesafe.sbt.MultiJvmPlugin.MultiJvmKeys.MultiJvm
import com.typesafe.tools.mima.plugin.MimaPlugin import com.typesafe.tools.mima.plugin.MimaPlugin
import sbt.Keys.{ initialCommands, parallelExecution } import sbt.Keys.{ initialCommands, parallelExecution }
import spray.boilerplate.BoilerplatePlugin import spray.boilerplate.BoilerplatePlugin

1
kubernetes/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
.tmp

View file

@ -0,0 +1,80 @@
#!/bin/bash -e
# Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Usage:
# create-cluster-gke.sh [CLUSTER-NAME] [CLUSTER-VERSION]
if [ $# -eq 0 ]
then
echo "No cluster name supplied"
echo "Usage: create-cluster-gke.sh [CLUSTER-NAME] (Optional)[CLUSTER-VERSION]"
exit 1
fi
gcloudZone=$(gcloud config get-value compute/zone)
if [ "$gcloudZone" == "" ]
then
echo "No compute/zone set in your GCloud configuration"
echo "Please set a compute zone by running: gcloud config set compute/zone VALUE [optional flags]"
exit 1
fi
gcloudRegion=$(gcloud config get-value compute/region)
if [ "$gcloudRegion" == "" ]
then
echo "No compute/region set in your GCloud configuration"
echo "Please set a compute region by running: gcloud config set compute/region VALUE [optional flags]"
exit 1
fi
gcloudProject=$(gcloud config get-value project)
if [ "$gcloudProject" == "" ]
then
echo "No project set in your GCloud configuration"
echo "Please set a compute region by running: gcloud config set project VALUE"
exit 1
fi
CLUSTER_NAME=$1
CLUSTER_VERSION=$2
if [ -z "$CLUSTER_VERSION" ]
then
# https://cloud.google.com/kubernetes-engine/versioning-and-upgrades#versions_available_for_new_cluster_masters
CLUSTER_VERSION=$(gcloud container get-server-config --format="value(defaultClusterVersion)")
echo "No cluster version specified. Using the default: $CLUSTER_VERSION"
else
echo "Cluster version: $CLUSTER_VERSION"
fi
# Create cluster
gcloud container clusters create $CLUSTER_NAME \
--cluster-version $CLUSTER_VERSION \
--enable-ip-alias \
--image-type cos \
--machine-type n1-standard-4 \
--num-nodes 5 \
--no-enable-autoupgrade
# --workload-pool=$gcloudProject.svc.id.goog # becoming default in next version, allows mapping of GCP service accounts to k8s service accounts
## Wait for clusters to come up
echo "Waiting for cluster to become stable before continuing with the installation....."
gcloud compute instance-groups managed list --filter="name~gke-$CLUSTER_NAME" --format="value(name)" | while read -r line ; do
gcloud compute instance-groups managed wait-until --stable $line
done
# Switch to new cluster
gcloud container clusters get-credentials $CLUSTER_NAME

21
kubernetes/setup.sh Executable file
View file

@ -0,0 +1,21 @@
#!/bin/bash
NUM_OF_NODES=$1
DEST_HOST_FILE=$2
TMP_DIR=.tmp
kubectl delete deployments,services -l app=multi-node-test | true
rm -rf ${DEST_HOST_FILE}
rm -rf ${TMP_DIR}
mkdir -p ${TMP_DIR}
touch ${DEST_HOST_FILE}
for i in `seq 1 "${NUM_OF_NODES}"`;
do
cat ./kubernetes/test-node-base.yaml | sed "s/test-nodeX/test-node${i}/" > ".tmp/test-node${i}.yml"
echo $i
echo "test-node${i}:/usr/local/openjdk-11/bin/java -Dmultinode.port=5000 -Dmultinode.udp-port=6000" >> ${DEST_HOST_FILE}
done
kubectl apply -f ${TMP_DIR}

View file

@ -0,0 +1,92 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-nodeX
labels:
app: multi-node-test
spec:
replicas: 1
selector:
matchLabels:
host: test-nodeX
template:
metadata:
labels:
host: test-nodeX
spec:
containers:
- image: openjdk:11
command: ["sleep", "infinity"]
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "2Gi"
lifecycle:
postStart:
exec:
command:
- /bin/sh
- -c
- truncate --size -1 /etc/hosts && echo " test-nodeX" >> /etc/hosts
imagePullPolicy: Always
name: multi-test-nodeX
volumeMounts:
- mountPath: /dev/shm
name: dshm
ports:
- name: web
containerPort: 80
protocol: TCP
- name: ssh
containerPort: 22
protocol: TCP
- name: multi-node
containerPort: 5000
protocol: TCP
- name: multi-node2
containerPort: 5001
protocol: TCP
- name: multi-node-udp
containerPort: 6000
protocol: UDP
- name: server-multi
containerPort: 4711
protocol: TCP
volumes:
# Needed for Aeron tests: https://github.com/real-logic/aeron/blob/master/README.md#troubleshooting
- name: dshm
emptyDir:
medium: Memory
---
apiVersion: v1
kind: Service
metadata:
name: test-nodeX
labels:
app: multi-node-test
spec:
selector:
host: test-nodeX
ports:
- protocol: TCP
name: ssh
port: 22
targetPort: 22
- protocol: TCP
name: server-multi
port: 4711
targetPort: 4711
- protocol: TCP
name: multi-node
port: 5000
targetPort: 5000
- protocol: TCP
name: multi-node2
port: 5001
targetPort: 5001
- protocol: UDP
name: multi-node-udp
port: 6000
targetPort: 6000

118
project/Jvm.scala Normal file
View file

@ -0,0 +1,118 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package com.typesafe.sbt.multijvm
import java.io.File
import java.lang.{ ProcessBuilder => JProcessBuilder }
import sbt._
import scala.sys.process.Process
object Jvm {
def startJvm(
javaBin: File,
jvmOptions: Seq[String],
runOptions: Seq[String],
logger: Logger,
connectInput: Boolean) = {
forkJava(javaBin, jvmOptions ++ runOptions, logger, connectInput)
}
def forkJava(javaBin: File, options: Seq[String], logger: Logger, connectInput: Boolean) = {
val java = javaBin.toString
val command = (java :: options.toList).toArray
val builder = new JProcessBuilder(command: _*)
Process(builder).run(logger, connectInput)
}
/**
* check if the current operating system is some OS
**/
def isOS(os: String) =
try {
System.getProperty("os.name").toUpperCase.startsWith(os.toUpperCase)
} catch {
case _: Throwable => false
}
/**
* convert to proper path for the operating system
**/
def osPath(path: String) = if (isOS("WINDOWS")) Process(Seq("cygpath", path)).lineStream.mkString else path
def getPodName(hostAndUser: String, sbtLogger: Logger): String = {
val command: Array[String] =
Array("kubectl", "get", "pods", "-l", s"host=$hostAndUser", "--no-headers", "-o", "name")
val builder = new JProcessBuilder(command: _*)
sbtLogger.debug("Jvm.getPodName about to run " + command.mkString(" "))
val podName = Process(builder).!!
sbtLogger.debug("Jvm.getPodName podName is " + podName)
podName.stripPrefix("pod/").stripSuffix("\n")
}
def syncJar(jarName: String, hostAndUser: String, remoteDir: String, sbtLogger: Logger): Process = {
val podName = getPodName(hostAndUser, sbtLogger)
val command: Array[String] =
Array("kubectl", "exec", podName, "--", "/bin/bash", "-c", s"rm -rf $remoteDir && mkdir -p $remoteDir")
val builder = new JProcessBuilder(command: _*)
sbtLogger.debug("Jvm.syncJar about to run " + command.mkString(" "))
val process = Process(builder).run(sbtLogger, false)
if (process.exitValue() == 0) {
val command: Array[String] = Array("kubectl", "cp", osPath(jarName), podName + ":" + remoteDir + "/")
val builder = new JProcessBuilder(command: _*)
sbtLogger.debug("Jvm.syncJar about to run " + command.mkString(" "))
Process(builder).run(sbtLogger, false)
} else {
process
}
}
def forkRemoteJava(
java: String,
jvmOptions: Seq[String],
appOptions: Seq[String],
jarName: String,
hostAndUser: String,
remoteDir: String,
logger: Logger,
connectInput: Boolean,
sbtLogger: Logger): Process = {
val podName = getPodName(hostAndUser, sbtLogger)
sbtLogger.debug("About to use java " + java)
val shortJarName = new File(jarName).getName
val javaCommand = List(List(java), jvmOptions, List("-cp", shortJarName), appOptions).flatten
val command = Array(
"kubectl",
"exec",
podName,
"--",
"/bin/bash",
"-c",
("cd " :: (remoteDir :: (" ; " :: javaCommand))).mkString(" "))
sbtLogger.debug("Jvm.forkRemoteJava about to run " + command.mkString(" "))
val builder = new JProcessBuilder(command: _*)
Process(builder).run(logger, connectInput)
}
}
class JvmBasicLogger(name: String) extends BasicLogger {
def jvm(message: String) = "[%s] %s".format(name, message)
def log(level: Level.Value, message: => String) = System.out.synchronized {
System.out.println(jvm(message))
}
def trace(t: => Throwable) = System.out.synchronized {
val traceLevel = getTrace
if (traceLevel >= 0) System.out.print(StackTrace.trimmed(t, traceLevel))
}
def success(message: => String) = log(Level.Info, message)
def control(event: ControlEvent.Value, message: => String) = log(Level.Info, message)
def logAll(events: Seq[LogEvent]) = System.out.synchronized { events.foreach(log) }
}
final class JvmLogger(name: String) extends JvmBasicLogger(name)

View file

@ -6,8 +6,8 @@ package akka
import akka.TestExtras.Filter.Keys._ import akka.TestExtras.Filter.Keys._
import com.typesafe.sbt.MultiJvmPlugin.MultiJvmKeys.multiJvmCreateLogger import com.typesafe.sbt.MultiJvmPlugin.MultiJvmKeys.multiJvmCreateLogger
import com.typesafe.sbt.SbtMultiJvm import com.typesafe.sbt.{ MultiJvmPlugin => SbtMultiJvm }
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys._ import com.typesafe.sbt.MultiJvmPlugin.MultiJvmKeys._
import sbt.{ Def, _ } import sbt.{ Def, _ }
import sbt.Keys._ import sbt.Keys._
import de.heikoseeberger.sbtheader.HeaderPlugin.autoImport._ import de.heikoseeberger.sbtheader.HeaderPlugin.autoImport._

543
project/SbtMultiJvm.scala Normal file
View file

@ -0,0 +1,543 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package com.typesafe.sbt
import com.typesafe.sbt.multijvm.{ Jvm, JvmLogger }
import scala.sys.process.Process
import sjsonnew.BasicJsonProtocol._
import sbt._
import Keys._
import java.io.File
import java.lang.Boolean.getBoolean
import scala.Console.{ GREEN, RESET }
import sbtassembly.AssemblyPlugin.assemblySettings
import sbtassembly.{ AssemblyKeys, MergeStrategy }
import AssemblyKeys._
object MultiJvmPlugin extends AutoPlugin {
case class Options(jvm: Seq[String], extra: String => Seq[String], run: String => Seq[String])
object MultiJvmKeys {
val MultiJvm = config("multi-jvm").extend(Test)
val multiJvmMarker = SettingKey[String]("multi-jvm-marker")
val multiJvmTests = TaskKey[Map[String, Seq[String]]]("multi-jvm-tests")
val multiJvmTestNames = TaskKey[Seq[String]]("multi-jvm-test-names")
val multiJvmApps = TaskKey[Map[String, Seq[String]]]("multi-jvm-apps")
val multiJvmAppNames = TaskKey[Seq[String]]("multi-jvm-app-names")
val multiJvmJavaCommand = TaskKey[File]("multi-jvm-java-command")
val jvmOptions = TaskKey[Seq[String]]("jvm-options") // TODO: shouldn't that be regular `javaOptions`?
val extraOptions = SettingKey[String => Seq[String]]("extra-options")
val multiJvmCreateLogger = TaskKey[String => Logger]("multi-jvm-create-logger")
val scalatestRunner = SettingKey[String]("scalatest-runner")
val scalatestOptions = SettingKey[Seq[String]]("scalatest-options")
val scalatestClasspath = TaskKey[Classpath]("scalatest-classpath")
val scalatestScalaOptions = TaskKey[String => Seq[String]]("scalatest-scala-options")
val scalatestMultiNodeScalaOptions = TaskKey[String => Seq[String]]("scalatest-multi-node-scala-options")
val multiTestOptions = TaskKey[Options]("multi-test-options")
val multiNodeTestOptions = TaskKey[Options]("multi-node-test-options")
val appScalaOptions = TaskKey[String => Seq[String]]("app-scala-options")
val multiRunOptions = TaskKey[Options]("multi-run-options")
val multiRunCopiedClassLocation = SettingKey[File]("multi-run-copied-class-location")
val multiJvmTestJar = TaskKey[String]("multi-jvm-test-jar")
val multiJvmTestJarName = TaskKey[String]("multi-jvm-test-jar-name")
val multiNodeTest = TaskKey[Unit]("multi-node-test")
val multiNodeExecuteTests = TaskKey[Tests.Output]("multi-node-execute-tests")
val multiNodeTestOnly = InputKey[Unit]("multi-node-test-only")
val multiNodeHosts = SettingKey[Seq[String]]("multi-node-hosts")
val multiNodeHostsFileName = SettingKey[String]("multi-node-hosts-file-name")
val multiNodeProcessedHosts = TaskKey[(IndexedSeq[String], IndexedSeq[String])]("multi-node-processed-hosts")
val multiNodeTargetDirName = SettingKey[String]("multi-node-target-dir-name")
val multiNodeJavaName = SettingKey[String]("multi-node-java-name")
// TODO fugly workaround for now
val multiNodeWorkAround =
TaskKey[(String, (IndexedSeq[String], IndexedSeq[String]), String)]("multi-node-workaround")
}
val autoImport = MultiJvmKeys
import MultiJvmKeys._
override def requires = plugins.JvmPlugin
override def projectConfigurations = Seq(MultiJvm)
override def projectSettings = multiJvmSettings
private[this] def noTestsMessage(scoped: ScopedKey[_])(implicit display: Show[ScopedKey[_]]): String =
"No tests to run for " + display.show(scoped)
lazy val multiJvmSettings: Seq[Def.Setting[_]] =
inConfig(MultiJvm)(Defaults.configSettings ++ internalMultiJvmSettings)
// https://github.com/sbt/sbt/blob/v0.13.15/main/actions/src/main/scala/sbt/Tests.scala#L296-L298
private[this] def showResults(log: Logger, results: Tests.Output, noTestsMessage: => String): Unit =
TestResultLogger.Default.copy(printNoTests = TestResultLogger.const(_.info(noTestsMessage))).run(log, results, "")
private def internalMultiJvmSettings =
assemblySettings ++ Seq(
multiJvmMarker := "MultiJvm",
loadedTestFrameworks := (loadedTestFrameworks in Test).value,
definedTests := Defaults.detectTests.value,
multiJvmTests := collectMultiJvm(definedTests.value.map(_.name), multiJvmMarker.value),
multiJvmTestNames := multiJvmTests.map(_.keys.toSeq).storeAs(multiJvmTestNames).triggeredBy(compile).value,
multiJvmApps := collectMultiJvm(discoveredMainClasses.value, multiJvmMarker.value),
multiJvmAppNames := multiJvmApps.map(_.keys.toSeq).storeAs(multiJvmAppNames).triggeredBy(compile).value,
multiJvmJavaCommand := javaCommand(javaHome.value, "java"),
jvmOptions := Seq.empty,
extraOptions := { (name: String) =>
Seq.empty
},
multiJvmCreateLogger := { (name: String) =>
new JvmLogger(name)
},
scalatestRunner := "org.scalatest.tools.Runner",
scalatestOptions := defaultScalatestOptions,
scalatestClasspath := managedClasspath.value.filter(_.data.name.contains("scalatest")),
multiRunCopiedClassLocation := new File(target.value, "multi-run-copied-libraries"),
scalatestScalaOptions := scalaOptionsForScalatest(
scalatestRunner.value,
scalatestOptions.value,
fullClasspath.value,
multiRunCopiedClassLocation.value),
scalatestMultiNodeScalaOptions := scalaMultiNodeOptionsForScalatest(
scalatestRunner.value,
scalatestOptions.value),
multiTestOptions := Options(jvmOptions.value, extraOptions.value, scalatestScalaOptions.value),
multiNodeTestOptions := Options(jvmOptions.value, extraOptions.value, scalatestMultiNodeScalaOptions.value),
appScalaOptions := scalaOptionsForApps(fullClasspath.value),
connectInput := true,
multiRunOptions := Options(jvmOptions.value, extraOptions.value, appScalaOptions.value),
executeTests := multiJvmExecuteTests.value,
testOnly := multiJvmTestOnly.evaluated,
test := showResults(streams.value.log, executeTests.value, "No tests to run for MultiJvm"),
run := multiJvmRun.evaluated,
runMain := multiJvmRun.evaluated,
// TODO try to make sure that this is only generated on a need to have basis
multiJvmTestJar := (assemblyOutputPath in assembly).map(_.getAbsolutePath).dependsOn(assembly).value,
multiJvmTestJarName := (assemblyOutputPath in assembly).value.getAbsolutePath,
multiNodeTest := {
implicit val display = Project.showContextKey(state.value)
showResults(streams.value.log, multiNodeExecuteTests.value, noTestsMessage(resolvedScoped.value))
},
multiNodeExecuteTests := multiNodeExecuteTestsTask.value,
multiNodeTestOnly := multiNodeTestOnlyTask.evaluated,
multiNodeHosts := Seq.empty,
multiNodeHostsFileName := "multi-node-test.hosts",
multiNodeProcessedHosts := processMultiNodeHosts(
multiNodeHosts.value,
multiNodeHostsFileName.value,
multiNodeJavaName.value,
streams.value),
multiNodeTargetDirName := "multi-node-test",
multiNodeJavaName := "java",
// TODO there must be a way get at keys in the tasks that I just don't get
multiNodeWorkAround := (multiJvmTestJar.value, multiNodeProcessedHosts.value, multiNodeTargetDirName.value),
// here follows the assembly parts of the config
// don't run the tests when creating the assembly
test in assembly := {},
// we want everything including the tests and test frameworks
fullClasspath in assembly := (fullClasspath in MultiJvm).value,
// the first class wins just like a classpath
// just concatenate conflicting text files
assemblyMergeStrategy in assembly := {
case n if n.endsWith(".class") => MergeStrategy.first
case n if n.endsWith(".txt") => MergeStrategy.concat
case n if n.endsWith("NOTICE") => MergeStrategy.concat
case n => (assemblyMergeStrategy in assembly).value.apply(n)
},
assemblyJarName in assembly := {
name.value + "_" + scalaVersion.value + "-" + version.value + "-multi-jvm-assembly.jar"
})
def collectMultiJvm(discovered: Seq[String], marker: String): Map[String, Seq[String]] = {
val found = discovered.filter(_.contains(marker)).groupBy(multiName(_, marker))
found.map {
case (key, values) =>
val totalNodes = sys.props.get(marker + "." + key + ".nrOfNodes").getOrElse(values.size.toString).toInt
val sortedClasses = values.sorted
val totalClasses = sortedClasses.padTo(totalNodes, sortedClasses.last)
(key, totalClasses)
}
}
def multiName(name: String, marker: String) = name.split(marker).head
def multiSimpleName(name: String) = name.split("\\.").last
def javaCommand(javaHome: Option[File], name: String): File = {
val home = javaHome.getOrElse(new File(System.getProperty("java.home")))
new File(new File(home, "bin"), name)
}
def defaultScalatestOptions: Seq[String] = {
if (getBoolean("sbt.log.noformat")) Seq("-oW") else Seq("-o")
}
def scalaOptionsForScalatest(
runner: String,
options: Seq[String],
fullClasspath: Classpath,
multiRunCopiedClassDir: File) = {
val directoryBasedClasspathEntries = fullClasspath.files.filter(_.isDirectory)
// Copy over just the jars to this folder.
fullClasspath.files
.filter(_.isFile)
.foreach(classpathFile =>
IO.copyFile(classpathFile, new File(multiRunCopiedClassDir, classpathFile.getName), true))
val cp = directoryBasedClasspathEntries.absString + File.pathSeparator + multiRunCopiedClassDir.getAbsolutePath + File.separator + "*"
(testClass: String) => { Seq("-cp", cp, runner, "-s", testClass) ++ options }
}
def scalaMultiNodeOptionsForScalatest(runner: String, options: Seq[String]) = { (testClass: String) =>
{ Seq(runner, "-s", testClass) ++ options }
}
def scalaOptionsForApps(classpath: Classpath) = {
val cp = classpath.files.absString
(mainClass: String) => Seq("-cp", cp, mainClass)
}
def multiJvmExecuteTests: Def.Initialize[sbt.Task[Tests.Output]] = Def.task {
runMultiJvmTests(
multiJvmTests.value,
multiJvmMarker.value,
multiJvmJavaCommand.value,
multiTestOptions.value,
sourceDirectory.value,
multiJvmCreateLogger.value,
streams.value.log)
}
def multiJvmTestOnly: Def.Initialize[sbt.InputTask[Unit]] =
InputTask.createDyn(loadForParser(multiJvmTestNames)((s, i) => Defaults.testOnlyParser(s, i.getOrElse(Nil)))) {
Def.task {
case (selection, _extraOptions) =>
val s = streams.value
val options = multiTestOptions.value
val opts = options.copy(extra = (s: String) => { options.extra(s) ++ _extraOptions })
val filters = selection.map(GlobFilter(_))
val tests = multiJvmTests.value.filterKeys(name => filters.exists(_.accept(name)))
Def.task {
val results = runMultiJvmTests(
tests,
multiJvmMarker.value,
multiJvmJavaCommand.value,
opts,
sourceDirectory.value,
multiJvmCreateLogger.value,
s.log)
showResults(s.log, results, "No tests to run for MultiJvm")
}
}
}
def runMultiJvmTests(
tests: Map[String, Seq[String]],
marker: String,
javaBin: File,
options: Options,
srcDir: File,
createLogger: String => Logger,
log: Logger): Tests.Output = {
val results =
if (tests.isEmpty)
List()
else
tests.map {
case (_name, classes) => multi(_name, classes, marker, javaBin, options, srcDir, false, createLogger, log)
}
Tests.Output(
Tests.overall(results.map(_._2)),
Map.empty,
results.map(result => Tests.Summary("multi-jvm", result._1)))
}
def multiJvmRun: Def.Initialize[sbt.InputTask[Unit]] =
InputTask.createDyn(loadForParser(multiJvmAppNames)((s, i) => runParser(s, i.getOrElse(Nil)))) {
Def.task {
val s = streams.value
val apps = multiJvmApps.value
val j = multiJvmJavaCommand.value
val c = connectInput.value
val dir = sourceDirectory.value
val options = multiRunOptions.value
val marker = multiJvmMarker.value
val createLogger = multiJvmCreateLogger.value
result => {
val classes = apps.getOrElse(result, Seq.empty)
Def.task {
if (classes.isEmpty) s.log.info("No apps to run.")
else multi(result, classes, marker, j, options, dir, c, createLogger, s.log)
}
}
}
}
def runParser: (State, Seq[String]) => complete.Parser[String] = {
import complete.DefaultParsers._
(state, appClasses) => Space ~> token(NotSpace.examples(appClasses.toSet))
}
def multi(
name: String,
classes: Seq[String],
marker: String,
javaBin: File,
options: Options,
srcDir: File,
input: Boolean,
createLogger: String => Logger,
log: Logger): (String, sbt.TestResult) = {
val logName = "* " + name
log.info(if (log.ansiCodesSupported) GREEN + logName + RESET else logName)
val classesHostsJavas = getClassesHostsJavas(classes, IndexedSeq.empty, IndexedSeq.empty, "")
val hosts = classesHostsJavas.map(_._2)
val processes = classes.zipWithIndex.map {
case (testClass, index) =>
val className = multiSimpleName(testClass)
val jvmName = "JVM-" + (index + 1) + "-" + className
val jvmLogger = createLogger(jvmName)
val optionsFile = (srcDir ** (className + ".opts")).get.headOption
val optionsFromFile =
optionsFile.map(IO.read(_)).map(_.trim.replace("\\n", " ").split("\\s+").toList).getOrElse(Seq.empty[String])
val multiNodeOptions = getMultiNodeCommandLineOptions(hosts, index, classes.size)
val allJvmOptions = options.jvm ++ multiNodeOptions ++ optionsFromFile ++ options.extra(className)
val runOptions = options.run(testClass)
val connectInput = input && index == 0
log.debug("Starting %s for %s".format(jvmName, testClass))
log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" ")))
(testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions, jvmLogger, connectInput))
}
processExitCodes(name, processes, log)
}
def processExitCodes(name: String, processes: Seq[(String, Process)], log: Logger): (String, sbt.TestResult) = {
val exitCodes = processes.map {
case (testClass, process) => (testClass, process.exitValue())
}
val failures = exitCodes.flatMap {
case (testClass, exit) if exit > 0 => Some("Failed: " + testClass)
case _ => None
}
failures.foreach(log.error(_))
(name, if (failures.nonEmpty) TestResult.Failed else TestResult.Passed)
}
def multiNodeExecuteTestsTask: Def.Initialize[sbt.Task[Tests.Output]] = Def.task {
val (_jarName, (hostsAndUsers, javas), targetDir) = multiNodeWorkAround.value
runMultiNodeTests(
multiJvmTests.value,
multiJvmMarker.value,
multiNodeJavaName.value,
multiNodeTestOptions.value,
sourceDirectory.value,
_jarName,
hostsAndUsers,
javas,
targetDir,
multiJvmCreateLogger.value,
streams.value.log)
}
def multiNodeTestOnlyTask: Def.Initialize[InputTask[Unit]] =
InputTask.createDyn(loadForParser(multiJvmTestNames)((s, i) => Defaults.testOnlyParser(s, i.getOrElse(Nil)))) {
Def.task {
case (selected, _extraOptions) =>
val options = multiNodeTestOptions.value
val (_jarName, (hostsAndUsers, javas), targetDir) = multiNodeWorkAround.value
val s = streams.value
val opts = options.copy(extra = (s: String) => { options.extra(s) ++ _extraOptions })
val tests = selected.flatMap { name =>
multiJvmTests.value.get(name).map((name, _))
}
Def.task {
val results = runMultiNodeTests(
tests.toMap,
multiJvmMarker.value,
multiNodeJavaName.value,
opts,
sourceDirectory.value,
_jarName,
hostsAndUsers,
javas,
targetDir,
multiJvmCreateLogger.value,
s.log)
showResults(s.log, results, "No tests to run for MultiNode")
}
}
}
def runMultiNodeTests(
tests: Map[String, Seq[String]],
marker: String,
java: String,
options: Options,
srcDir: File,
jarName: String,
hostsAndUsers: IndexedSeq[String],
javas: IndexedSeq[String],
targetDir: String,
createLogger: String => Logger,
log: Logger): Tests.Output = {
val results =
if (tests.isEmpty)
List()
else
tests.map {
case (_name, classes) =>
multiNode(
_name,
classes,
marker,
java,
options,
srcDir,
false,
jarName,
hostsAndUsers,
javas,
targetDir,
createLogger,
log)
}
Tests.Output(
Tests.overall(results.map(_._2)),
Map.empty,
results.map(result => Tests.Summary("multi-jvm", result._1)))
}
def multiNode(
name: String,
classes: Seq[String],
marker: String,
defaultJava: String,
options: Options,
srcDir: File,
input: Boolean,
testJar: String,
hostsAndUsers: IndexedSeq[String],
javas: IndexedSeq[String],
targetDir: String,
createLogger: String => Logger,
log: Logger): (String, sbt.TestResult) = {
val logName = "* " + name
log.info(if (log.ansiCodesSupported) GREEN + logName + RESET else logName)
val classesHostsJavas = getClassesHostsJavas(classes, hostsAndUsers, javas, defaultJava)
val hosts = classesHostsJavas.map(_._2)
// TODO move this out, maybe to the hosts string as well?
val syncProcesses = classesHostsJavas.map {
case ((testClass, hostAndUser, java)) =>
(testClass + " sync", Jvm.syncJar(testJar, hostAndUser, targetDir, log))
}
val syncResult = processExitCodes(name, syncProcesses, log)
if (syncResult._2 == TestResult.Passed) {
val processes = classesHostsJavas.zipWithIndex.map {
case ((testClass, hostAndUser, java), index) => {
val jvmName = "JVM-" + (index + 1)
val jvmLogger = createLogger(jvmName)
val className = multiSimpleName(testClass)
val optionsFile = (srcDir ** (className + ".opts")).get.headOption
val optionsFromFile = optionsFile
.map(IO.read(_))
.map(_.trim.replace("\\n", " ").split("\\s+").toList)
.getOrElse(Seq.empty[String])
val multiNodeOptions = getMultiNodeCommandLineOptions(hosts, index, classes.size)
val allJvmOptions = options.jvm ++ optionsFromFile ++ options.extra(className) ++ multiNodeOptions
val runOptions = options.run(testClass)
val connectInput = input && index == 0
log.debug("Starting %s for %s".format(jvmName, testClass))
log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" ")))
(
testClass,
Jvm.forkRemoteJava(
java,
allJvmOptions,
runOptions,
testJar,
hostAndUser,
targetDir,
jvmLogger,
connectInput,
log))
}
}
processExitCodes(name, processes, log)
} else {
syncResult
}
}
private def padSeqOrDefaultTo(seq: IndexedSeq[String], default: String, max: Int): IndexedSeq[String] = {
val realSeq = if (seq.isEmpty) IndexedSeq(default) else seq
if (realSeq.size >= max)
realSeq
else
(realSeq /: (0 until (max - realSeq.size)))((mySeq, pos) => mySeq :+ realSeq(pos % realSeq.size))
}
private def getClassesHostsJavas(
classes: Seq[String],
hostsAndUsers: IndexedSeq[String],
javas: IndexedSeq[String],
defaultJava: String): IndexedSeq[(String, String, String)] = {
val max = classes.length
val tuple = (
classes.toIndexedSeq,
padSeqOrDefaultTo(hostsAndUsers, "localhost", max),
padSeqOrDefaultTo(javas, defaultJava, max))
tuple.zipped.map { case (className: String, hostAndUser: String, _java: String) => (className, hostAndUser, _java) }
}
private def getMultiNodeCommandLineOptions(hosts: Seq[String], index: Int, maxNodes: Int): Seq[String] = {
Seq(
"-Dmultinode.max-nodes=" + maxNodes,
"-Dmultinode.server-host=" + hosts(0).split("@").last,
"-Dmultinode.host=" + hosts(index).split("@").last,
"-Dmultinode.index=" + index)
}
private def processMultiNodeHosts(
hosts: Seq[String],
hostsFileName: String,
defaultJava: String,
s: Types.Id[Keys.TaskStreams]): (IndexedSeq[String], IndexedSeq[String]) = {
val hostsFile = new File(hostsFileName)
val theHosts: IndexedSeq[String] =
if (hosts.isEmpty) {
if (hostsFile.exists && hostsFile.canRead) {
s.log.info("Using hosts defined in file " + hostsFile.getAbsolutePath)
IO.readLines(hostsFile).map(_.trim).filter(_.length > 0).toIndexedSeq
} else
hosts.toIndexedSeq
} else {
if (hostsFile.exists && hostsFile.canRead)
s.log.info(
"Hosts from setting " + multiNodeHosts.key.label + " is overrriding file " + hostsFile.getAbsolutePath)
hosts.toIndexedSeq
}
theHosts.map { x =>
val elems = x.split(":").toList.take(2).padTo(2, defaultJava)
(elems(0), elems(1))
} unzip
}
}

View file

@ -3,11 +3,6 @@ libraryDependencies += Defaults.sbtPluginExtra(
(pluginCrossBuild / sbtBinaryVersion).value, (pluginCrossBuild / sbtBinaryVersion).value,
(pluginCrossBuild / scalaBinaryVersion).value) (pluginCrossBuild / scalaBinaryVersion).value)
// these comment markers are for including code into the docs
//#sbt-multi-jvm
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
//#sbt-multi-jvm
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.6.0") addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.6.0")
addSbtPlugin("com.lightbend.sbt" % "sbt-bill-of-materials" % "1.0.2") addSbtPlugin("com.lightbend.sbt" % "sbt-bill-of-materials" % "1.0.2")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")