move aeron tests in multi-node (#30706)

This commit is contained in:
Renato Cavalcanti 2021-10-20 08:07:47 +02:00 committed by GitHub
parent f561146fa7
commit 4ef9b31d8e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 178 additions and 125 deletions

View file

@ -19,6 +19,7 @@ jobs:
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Install Kubectl
run: |
sudo snap install kubectl --classic
@ -26,21 +27,26 @@ jobs:
with:
service_account_key: ${{ secrets.GKE_SA_KEY }}
project_id: ${{ secrets.GKE_PROJECT }}
- name: Create the cluster
run: |-
gcloud config set compute/region us-central1
gcloud config set compute/zone us-central1-c
./kubernetes/create-cluster-gke.sh "akka-multi-node-${GITHUB_RUN_ID}"
- name: Setup Pods
run: |
# Stress tests are using 13 nodes
./kubernetes/setup.sh 15 multi-node-test.hosts
# Stress tests are using 13 nodes.
./kubernetes/setup.sh 15 multi-node-test.hosts tcp
- name: Set up JDK 11
uses: olafurpg/setup-scala@v10
with:
java-version: adopt@1.11.0-9
- name: Cache Coursier cache
uses: coursier/cache-action@v6.2
- name: Multi node test
run: |
cat multi-node-test.hosts
@ -57,6 +63,7 @@ jobs:
-Dmultinode.Xmx512M \
-Dmultinode.Xlog:gc \
multiNodeTest
- name: Email on failure
if: ${{ failure() }}
uses: dawidd6/action-send-mail@v3
@ -73,8 +80,96 @@ jobs:
body: |
Multi node test of ${{github.repository}} failed!
https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
- name: Cleanup the environment
if: ${{ always() }}
shell: bash {0}
run: |
gcloud container clusters delete "akka-multi-node-${GITHUB_RUN_ID}" --quiet
akka-artery-aeron-cluster-tests:
name: Artery Aeron UDP Cluster
runs-on: ubuntu-20.04
if: github.repository == 'akka/akka'
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Install Kubectl
run: |
sudo snap install kubectl --classic
- uses: google-github-actions/setup-gcloud@v0.2
with:
service_account_key: ${{ secrets.GKE_SA_KEY }}
project_id: ${{ secrets.GKE_PROJECT }}
- name: Create the cluster
run: |-
gcloud config set compute/region us-central1
gcloud config set compute/zone us-central1-c
./kubernetes/create-cluster-gke.sh "akka-artery-aeron-cluster-${GITHUB_RUN_ID}"
- name: Setup Pods
run: |
# Stress tests are using 13 nodes
./kubernetes/setup.sh 15 multi-node-test.hosts udp
- name: Set up JDK 11
uses: olafurpg/setup-scala@v10
with:
java-version: adopt@1.11.0-9
- name: Cache Coursier cache
uses: coursier/cache-action@v6.2
- name: Artery Aeron UDP Cluster test
run: |
cat multi-node-test.hosts
sbt -jvm-opts .jvmopts-ci \
-Dakka.test.timefactor=2 \
-Dakka.actor.testkit.typed.timefactor=2 \
-Dakka.cluster.assert=on \
-Dakka.remote.artery.transport=aeron-udp \
-Dsbt.override.build.repos=false \
-Dakka.test.tags.exclude=gh-exclude \
-Dakka.test.multi-node=true \
-Dakka.test.multi-node.targetDirName=${PWD}/target/${{ github.run_id }} \
-Dakka.test.multi-node.java=${JAVA_HOME}/bin/java \
-Dmultinode.XX:MetaspaceSize=128M \
-Dmultinode.Xms512M \
-Dmultinode.Xmx512M \
-Dmultinode.Xlog:gc \
akka-cluster/test \
akka-distributed-data/test \
akka-cluster-tools/test \
akka-cluster-metrics/test \
akka-cluster-sharding/test \
akka-cluster-typed/test \
akka-cluster-sharding-typed/test \
akka-remote/test \
akka-remote-tests/test
- name: Email on failure
if: ${{ failure() }}
uses: dawidd6/action-send-mail@v3
with:
server_address: smtp.gmail.com
server_port: 465
# Using port 465 already sets `secure: true`
secure: true
username: ${{secrets.MAIL_USERNAME}}
password: ${{secrets.MAIL_PASSWORD}}
subject: Artery Aeron UDP Cluster (Akka)
to: akka.official@gmail.com
from: Akka CI (GHActions)
body: |
Artery Aeron UDP Cluster of ${{github.repository}} failed!
https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
- name: Cleanup the environment
if: ${{ always() }}
shell: bash {0}
run: |
gcloud container clusters delete "akka-artery-aeron-cluster-${GITHUB_RUN_ID}" --quiet

View file

@ -196,63 +196,3 @@ jobs:
body: |
Job ${{ github.job }} in workflow ${{ github.workflow }} of ${{github.repository}} failed!
https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
akka-artery-aeron-cluster-tests:
name: Artery Aeron UDP Cluster
runs-on: ubuntu-20.04
# FIXME disabled due to https://github.com/akka/akka/issues/30601
#if: github.repository == 'akka/akka'
if: ${{ false }}
strategy:
fail-fast: false
matrix:
command:
- akka-cluster/test akka-distributed-data/test akka-cluster-tools/test akka-cluster-metrics/test
- akka-cluster-sharding/test
- akka-cluster-typed/test akka-cluster-sharding-typed/test
- akka-remote/test akka-remote-tests/test
steps:
- name: Checkout
uses: actions/checkout@v2
with:
# we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves
fetch-depth: 0
- name: Set up JDK 11
uses: olafurpg/setup-scala@v10
with:
java-version: adopt@1.11
- name: Cache Coursier cache
uses: coursier/cache-action@v6.2
- name: sbt ${{ matrix.command }}
run: |-
sbt -jvm-opts .jvmopts-ci \
-Djava.security.egd=file:/dev/./urandom \
-Dakka.remote.artery.transport=aeron-udp \
-Dakka.test.timefactor=2 \
-Dakka.actor.testkit.typed.timefactor=2 \
-Dakka.cluster.assert=on \
-Dakka.test.tags.exclude=gh-exclude \
-Dmultinode.XX:MetaspaceSize=128M \
-Dmultinode.Xms256M \
-Dmultinode.Xmx256M \
-Dmultinode.Xlog:gc \
clean ${{ matrix.command }}
- name: Email on failure
if: ${{ failure() }}
uses: dawidd6/action-send-mail@v3
with:
server_address: smtp.gmail.com
server_port: 465
username: ${{secrets.MAIL_USERNAME}}
password: ${{secrets.MAIL_PASSWORD}}
subject: "Failed: ${{ github.workflow }} / ${{ github.job }}"
to: akka.official@gmail.com
from: Akka CI (GHActions)
body: |
Job ${{ github.job }} in workflow ${{ github.workflow }} of ${{github.repository}} failed!
https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}

View file

@ -405,7 +405,13 @@ class RandomizedSplitBrainResolverIntegrationSpec
"SplitBrainResolver with lease" must {
for (scenario <- scenarios) {
scenario.toString taggedAs LongRunningTest in {
scenario.toString taggedAs (LongRunningTest) in {
// temporarily disabled for aeron-udp in multi-node: https://github.com/akka/akka/pull/30706/
val arteryConfig = system.settings.config.getConfig("akka.remote.artery")
if (arteryConfig.getInt("canonical.port") == 6000 &&
arteryConfig.getString("transport") == "aeron-udp") {
pending
}
DisposableSys(scenario).verify()
}
}

View file

@ -459,6 +459,12 @@ class SplitBrainResolverIntegrationSpec
for (scenario <- scenarios) {
scenario.toString taggedAs LongRunningTest in {
// temporarily disabled for aeron-udp in multi-node: https://github.com/akka/akka/pull/30706/
val arteryConfig = system.settings.config.getConfig("akka.remote.artery")
if (arteryConfig.getInt("canonical.port") == 6000 &&
arteryConfig.getString("transport") == "aeron-udp") {
pending
}
DisposableSys(scenario).verify()
}
}

View file

@ -714,6 +714,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case _ => None
}
context.become(tryingToJoin(address, joinDeadline))
logDebug("Trying to join [{}]", address)
clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles, cluster.settings.AppVersion)
}
}

View file

@ -160,29 +160,41 @@ object MultiNodeSpec {
require(selfName != "", "multinode.host must not be empty")
/**
* Port number of this node. Defaults to 0 which means a random port.
* TCP Port number to be used when running tests on TCP. 0 means a random port.
*
* {{{
* -Dmultinode.port=0
* }}}
*/
val selfPort: Int = Integer.getInteger("multinode.port", 0)
val tcpPort: Int = Integer.getInteger("multinode.port", 0)
require(selfPort >= 0 && selfPort < 65535, "multinode.port is out of bounds: " + selfPort)
require(tcpPort >= 0 && tcpPort < 65535, "multinode.port is out of bounds: " + tcpPort)
/**
* UDP Port number to be used on this node. 0 means a random port.
* UDP Port number to be used when running tests on UDP. 0 means a random port.
*
* {{{
* -Dmultinode.udp-port=0
* -Dmultinode.udp.port=0
* }}}
*/
val udpPort: Option[Int] = Option(System.getProperty("multinode.udp-port")) match {
case None => None
case Some(_) => Some(Integer.getInteger("multinode.udp-port", 0))
}
val udpPort: Option[Int] =
Option(System.getProperty("multinode.udp.port")).map { _ =>
Integer.getInteger("multinode.udp.port", 0)
}
require(udpPort.getOrElse(1) >= 0 && udpPort.getOrElse(1) < 65535, "multinode.udp-port is out of bounds: " + udpPort)
require(udpPort.getOrElse(1) >= 0 && udpPort.getOrElse(1) < 65535, "multinode.udp.port is out of bounds: " + udpPort)
/**
* Port number of this node.
*
* This is defined in function of property `multinode.protocol`.
* If set to 'udp', udpPort will be used. If unset or any other value, it will default to tcpPort.
*/
val selfPort: Int =
System.getProperty("multinode.protocol") match {
case "udp" => udpPort.getOrElse(0)
case _ => tcpPort
}
/**
* Name (or IP address; must be resolvable using InetAddress.getByName)
@ -227,7 +239,7 @@ object MultiNodeSpec {
"akka.actor.provider" -> "remote",
"akka.remote.artery.canonical.hostname" -> selfName,
"akka.remote.classic.netty.tcp.hostname" -> selfName,
"akka.remote.classic.netty.tcp.port" -> selfPort,
"akka.remote.classic.netty.tcp.port" -> tcpPort,
"akka.remote.artery.canonical.port" -> selfPort))
private[testkit] val baseConfig: Config =
@ -257,9 +269,10 @@ object MultiNodeSpec {
ConfigFactory.parseMap(map.asJava)
}
// Multi node tests on kuberenetes require fixed ports to be mapped and exposed
// Multi node tests on kubernetes require fixed ports to be mapped and exposed
// This method change the port bindings to avoid conflicts
// Please note that with the current setup only port 5000 and 5001 are exposed in kubernetes
// Please note that with the current setup only port 5000 and 5001 (or 6000 and 6001 when using UDP)
// are exposed in kubernetes
def configureNextPortIfFixed(config: Config): Config = {
val arteryPortConfig = getNextPortString("akka.remote.artery.canonical.port", config)
val nettyPortConfig = getNextPortString("akka.remote.classic.netty.tcp.port", config)

View file

@ -19,9 +19,7 @@ import org.agrona.IoUtil
import akka.Done
import akka.actor.ExtendedActorSystem
import akka.actor.Props
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.KillSwitches
import akka.stream.ThrottleMode
@ -50,7 +48,7 @@ class AeronStreamConsistencySpecMultiJvmNode1 extends AeronStreamConsistencySpec
class AeronStreamConsistencySpecMultiJvmNode2 extends AeronStreamConsistencySpec
abstract class AeronStreamConsistencySpec
extends MultiNodeSpec(AeronStreamConsistencySpec)
extends AeronStreamMultiNodeSpec(AeronStreamConsistencySpec)
with STMultiNodeSpec
with ImplicitSender {
@ -77,17 +75,6 @@ abstract class AeronStreamConsistencySpec
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val n = node(roleName)
val port = MultiNodeSpec.udpPort match {
case None =>
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
expectMsgType[Int]
case Some(p) => p
}
s"aeron:udp?endpoint=${n.address.host.get}:$port"
}
val streamId = 1
val giveUpMessageAfter = 30.seconds

View file

@ -26,9 +26,7 @@ import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.Done
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.KillSwitches
import akka.stream.ThrottleMode
@ -72,7 +70,7 @@ class AeronStreamLatencySpecMultiJvmNode1 extends AeronStreamLatencySpec
class AeronStreamLatencySpecMultiJvmNode2 extends AeronStreamLatencySpec
abstract class AeronStreamLatencySpec
extends MultiNodeSpec(AeronStreamLatencySpec)
extends AeronStreamMultiNodeSpec(AeronStreamLatencySpec)
with STMultiNodeSpec
with ImplicitSender {
@ -106,18 +104,6 @@ abstract class AeronStreamLatencySpec
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val n = node(roleName)
val port = MultiNodeSpec.udpPort match {
case None =>
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
expectMsgType[Int]
case Some(p) => p
}
s"aeron:udp?endpoint=${n.address.host.get}:$port"
}
val streamId = 1
val giveUpMessageAfter = 30.seconds

View file

@ -19,9 +19,7 @@ import io.aeron.driver.MediaDriver
import org.agrona.IoUtil
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.KillSwitches
import akka.stream.scaladsl.Source
@ -68,7 +66,7 @@ class AeronStreamMaxThroughputSpecMultiJvmNode1 extends AeronStreamMaxThroughput
class AeronStreamMaxThroughputSpecMultiJvmNode2 extends AeronStreamMaxThroughputSpec
abstract class AeronStreamMaxThroughputSpec
extends MultiNodeSpec(AeronStreamMaxThroughputSpec)
extends AeronStreamMultiNodeSpec(AeronStreamMaxThroughputSpec)
with STMultiNodeSpec
with ImplicitSender {
@ -106,17 +104,6 @@ abstract class AeronStreamMaxThroughputSpec
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val n = node(roleName)
val port = MultiNodeSpec.udpPort match {
case None =>
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
expectMsgType[Int]
case Some(p) => p
}
s"aeron:udp?endpoint=${n.address.host.get}:$port"
}
val streamId = 1
val giveUpMessageAfter = 30.seconds

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2016-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery.aeron
import akka.remote.artery.UdpPortActor
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
abstract class AeronStreamMultiNodeSpec(config: MultiNodeConfig) extends MultiNodeSpec(config) {
def channel(roleName: RoleName) = {
val n = node(roleName)
val port = MultiNodeSpec.udpPort match {
case None =>
system.actorSelection(n / "user" / "updPort") ! UdpPortActor.GetUdpPort
expectMsgType[Int]
case Some(p) => p
}
s"aeron:udp?endpoint=${n.address.host.get}:$port"
}
}

View file

@ -96,7 +96,7 @@ private[remote] class Encoder(
headerBuilder.setOutboundActorRefCompression(table)
}
private val changeClassManifsetCompressionCb = getAsyncCallback[CompressionTable[String]] { table =>
private val changeClassManifestCompressionCb = getAsyncCallback[CompressionTable[String]] { table =>
headerBuilder.setOutboundClassManifestCompression(table)
}
@ -219,7 +219,7 @@ private[remote] class Encoder(
* External call from ChangeOutboundCompression materialized value
*/
override def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] =
changeClassManifsetCompressionCb.invokeWithFeedback(table)
changeClassManifestCompressionCb.invokeWithFeedback(table)
/**
* External call from ChangeOutboundCompression materialized value

View file

@ -148,6 +148,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
// make sure we only close the driver once or we will crash the JVM
val maybeDriver = mediaDriver.getAndSet(None)
maybeDriver.foreach { driver =>
log.info("Stopping embedded media driver in directory [{}]", driver.aeronDirectoryName)
// this is only for embedded media driver
try driver.close()
catch {

View file

@ -1,6 +1,7 @@
#!/bin/bash
NUM_OF_NODES=$1
DEST_HOST_FILE=$2
PROTOCOL=$3
TMP_DIR=.tmp
kubectl delete deployments,services -l app=multi-node-test | true
@ -15,7 +16,7 @@ for i in `seq 1 "${NUM_OF_NODES}"`;
do
cat ./kubernetes/test-node-base.yaml | sed "s/test-nodeX/test-node${i}/" > ".tmp/test-node${i}.yml"
echo $i
echo "test-node${i}:/usr/local/openjdk-11/bin/java -Dmultinode.port=5000 -Dmultinode.udp-port=6000" >> ${DEST_HOST_FILE}
echo "test-node${i}:/usr/local/openjdk-11/bin/java -Dmultinode.protocol=$PROTOCOL -Dmultinode.port=5000 -Dmultinode.udp.port=6000" >> ${DEST_HOST_FILE}
done
kubectl apply -f ${TMP_DIR}

View file

@ -22,7 +22,7 @@ spec:
memory: "2Gi"
cpu: "1"
limits:
memory: "2Gi"
memory: "4Gi"
lifecycle:
postStart:
exec:
@ -51,6 +51,9 @@ spec:
- name: multi-node-udp
containerPort: 6000
protocol: UDP
- name: multi-node-udp2
containerPort: 6001
protocol: UDP
- name: server-multi
containerPort: 4711
protocol: TCP
@ -90,3 +93,7 @@ spec:
name: multi-node-udp
port: 6000
targetPort: 6000
- protocol: UDP
name: multi-node-udp2
port: 6001
targetPort: 6001

View file

@ -51,7 +51,7 @@ object MultiNode extends AutoPlugin {
// -Dmultinode.Djava.net.preferIPv4Stack=true -Dmultinode.Xmx512m -Dmultinode.XX:MaxPermSize=256M
// -DMultiJvm.akka.cluster.Stress.nrOfNodes=15
val MultinodeJvmArgs = "multinode\\.(D|X)(.*)".r
val knownPrefix = Set("multnode.", "akka.", "MultiJvm.")
val knownPrefix = Set("akka.", "MultiJvm.")
val akkaProperties = System.getProperties.stringPropertyNames.asScala.toList.collect {
case MultinodeJvmArgs(a, b) =>
val value = System.getProperty("multinode." + a + b)

View file

@ -572,7 +572,7 @@ object MultiJvmPlugin extends AutoPlugin {
} else {
if (hostsFile.exists && hostsFile.canRead)
s.log.info(
"Hosts from setting " + multiNodeHosts.key.label + " is overrriding file " + hostsFile.getAbsolutePath)
"Hosts from setting " + multiNodeHosts.key.label + " is overriding file " + hostsFile.getAbsolutePath)
hosts.toIndexedSeq
}