scala3: akka-cluster (#30417)
Co-authored-by: Arnout Engelen <arnout@bzzt.net>
This commit is contained in:
parent
f3fb2a577d
commit
4129a75db0
12 changed files with 151 additions and 43 deletions
1
.github/workflows/scala3-build.yml
vendored
1
.github/workflows/scala3-build.yml
vendored
|
|
@ -34,6 +34,7 @@ jobs:
|
|||
akka-actor-testkit-typed/test \
|
||||
akka-actor-typed/compile \
|
||||
akka-actor-typed-tests/test \
|
||||
akka-cluster/Test/compile \
|
||||
akka-coordination/test \
|
||||
akka-discovery/test \
|
||||
akka-pki/test \
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ private[metrics] object MetricsCollector {
|
|||
|
||||
/** Try to create collector instance in the order of priority. */
|
||||
def apply(system: ActorSystem): MetricsCollector = {
|
||||
val log = Logging(system, getClass)
|
||||
val log = Logging(system, classOf[MetricsCollector])
|
||||
val settings = ClusterMetricsSettings(system.settings.config)
|
||||
import settings._
|
||||
|
||||
|
|
|
|||
|
|
@ -15569,7 +15569,7 @@ public final class ClusterMessages {
|
|||
*
|
||||
* Protobuf type {@code VectorClock}
|
||||
*/
|
||||
public static final class VectorClock extends
|
||||
public static final class VectorClock extends
|
||||
akka.protobufv3.internal.GeneratedMessageV3 implements
|
||||
// @@protoc_insertion_point(message_implements:VectorClock)
|
||||
VectorClockOrBuilder {
|
||||
|
|
@ -15691,7 +15691,7 @@ public final class ClusterMessages {
|
|||
/**
|
||||
* Protobuf type {@code VectorClock.Version}
|
||||
*/
|
||||
public static final class Version extends
|
||||
public static final class Version extends
|
||||
akka.protobufv3.internal.GeneratedMessageV3 implements
|
||||
// @@protoc_insertion_point(message_implements:VectorClock.Version)
|
||||
VersionOrBuilder {
|
||||
|
|
@ -15977,23 +15977,23 @@ public final class ClusterMessages {
|
|||
}
|
||||
|
||||
@java.lang.Override
|
||||
public Builder newBuilderForType() { return newBuilder(); }
|
||||
public static Builder newBuilder() {
|
||||
public Version.Builder newBuilderForType() { return newBuilder(); }
|
||||
public static Version.Builder newBuilder() {
|
||||
return DEFAULT_INSTANCE.toBuilder();
|
||||
}
|
||||
public static Builder newBuilder(akka.cluster.protobuf.msg.ClusterMessages.VectorClock.Version prototype) {
|
||||
return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
|
||||
}
|
||||
@java.lang.Override
|
||||
public Builder toBuilder() {
|
||||
public Version.Builder toBuilder() {
|
||||
return this == DEFAULT_INSTANCE
|
||||
? new Builder() : new Builder().mergeFrom(this);
|
||||
? new Version.Builder() : new Version.Builder().mergeFrom(this);
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
protected Builder newBuilderForType(
|
||||
protected Version.Builder newBuilderForType(
|
||||
akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) {
|
||||
Builder builder = new Builder(parent);
|
||||
Version.Builder builder = new Version.Builder(parent);
|
||||
return builder;
|
||||
}
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -522,7 +522,7 @@ object ClusterEvent {
|
|||
val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState))
|
||||
val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState))
|
||||
|
||||
currentUnreachableDcs.diff(oldUnreachableDcs).iterator.map(UnreachableDataCenter).to(immutable.IndexedSeq)
|
||||
currentUnreachableDcs.diff(oldUnreachableDcs).iterator.map(UnreachableDataCenter.apply).to(immutable.IndexedSeq)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -541,7 +541,7 @@ object ClusterEvent {
|
|||
val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState))
|
||||
val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState))
|
||||
|
||||
oldUnreachableDcs.diff(currentUnreachableDcs).iterator.map(ReachableDataCenter).to(immutable.IndexedSeq)
|
||||
oldUnreachableDcs.diff(currentUnreachableDcs).iterator.map(ReachableDataCenter.apply).to(immutable.IndexedSeq)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
private val eventBusListener: ActorRef = {
|
||||
cluster.system
|
||||
.systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
|
||||
override def preStart(): Unit = cluster.subscribe(this.self, classOf[ClusterDomainEvent])
|
||||
|
||||
def receive: Receive = {
|
||||
case e: ClusterDomainEvent if !_closed =>
|
||||
|
|
|
|||
|
|
@ -305,7 +305,6 @@ object MemberStatus {
|
|||
}
|
||||
|
||||
object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] {
|
||||
|
||||
// for binary compatibility
|
||||
@deprecated("Use Long UID apply instead", since = "2.4.11")
|
||||
def apply(address: Address, uid: Int) = new UniqueAddress(address, uid.toLong)
|
||||
|
|
@ -313,6 +312,9 @@ object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] {
|
|||
def apply(remoteUniqueAddress: akka.remote.UniqueAddress): UniqueAddress =
|
||||
new UniqueAddress(remoteUniqueAddress.address, remoteUniqueAddress.uid)
|
||||
|
||||
def apply(address: Address, longUid: Long) = new UniqueAddress(address, longUid)
|
||||
|
||||
def unapply(address: UniqueAddress): Option[(Address, Long)] = Some((address.address, address.longUid))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -321,10 +323,28 @@ object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] {
|
|||
* incarnations of a member with same hostname and port.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
final case class UniqueAddress(address: Address, longUid: Long) extends Ordered[UniqueAddress] {
|
||||
final class UniqueAddress(val address: Address, val longUid: Long)
|
||||
extends Product
|
||||
with Serializable
|
||||
with Ordered[UniqueAddress] {
|
||||
|
||||
override def hashCode = java.lang.Long.hashCode(longUid)
|
||||
|
||||
override def productArity: Int = 2
|
||||
override def productElement(n: Int): Any = n match {
|
||||
case 0 => address
|
||||
case 1 => longUid
|
||||
}
|
||||
override def canEqual(that: Any): Boolean = that.isInstanceOf[UniqueAddress]
|
||||
|
||||
override def equals(obj: Any): Boolean =
|
||||
obj match {
|
||||
case ua: UniqueAddress => this.address.equals(ua.address) && this.longUid.equals(ua.longUid)
|
||||
case _ => false
|
||||
}
|
||||
|
||||
override def toString = s"UniqueAddress($address,$longUid)"
|
||||
|
||||
def compare(that: UniqueAddress): Int = {
|
||||
val result = Member.addressOrdering.compare(this.address, that.address)
|
||||
if (result == 0) if (this.longUid < that.longUid) -1 else if (this.longUid == that.longUid) 0 else 1
|
||||
|
|
|
|||
|
|
@ -5,14 +5,11 @@
|
|||
package akka.cluster.routing
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.annotation.{ tailrec, varargs }
|
||||
import scala.collection.immutable
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.actor._
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.ClusterEvent._
|
||||
|
|
@ -31,6 +28,7 @@ import akka.routing.RouterActor
|
|||
import akka.routing.RouterConfig
|
||||
import akka.routing.RouterPoolActor
|
||||
import akka.routing.RoutingLogic
|
||||
import akka.util.HashCode
|
||||
import akka.util.ccompat.JavaConverters._
|
||||
|
||||
object ClusterRouterGroupSettings {
|
||||
|
|
@ -40,7 +38,7 @@ object ClusterRouterGroupSettings {
|
|||
routeesPaths: immutable.Seq[String],
|
||||
allowLocalRoutees: Boolean,
|
||||
useRole: Option[String]): ClusterRouterGroupSettings =
|
||||
ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet)
|
||||
new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet)
|
||||
|
||||
@varargs
|
||||
def apply(
|
||||
|
|
@ -48,28 +46,71 @@ object ClusterRouterGroupSettings {
|
|||
routeesPaths: immutable.Seq[String],
|
||||
allowLocalRoutees: Boolean,
|
||||
useRoles: String*): ClusterRouterGroupSettings =
|
||||
ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet)
|
||||
new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet)
|
||||
|
||||
// For backwards compatibility, useRoles is the combination of use-roles and use-role
|
||||
def fromConfig(config: Config): ClusterRouterGroupSettings =
|
||||
ClusterRouterGroupSettings(
|
||||
new ClusterRouterGroupSettings(
|
||||
totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config),
|
||||
routeesPaths = immutableSeq(config.getStringList("routees.paths")),
|
||||
allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"),
|
||||
useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption(
|
||||
config.getString("cluster.use-role")))
|
||||
|
||||
def apply(
|
||||
totalInstances: Int,
|
||||
routeesPaths: immutable.Seq[String],
|
||||
allowLocalRoutees: Boolean,
|
||||
useRoles: Set[String]): ClusterRouterGroupSettings =
|
||||
new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles)
|
||||
|
||||
def unapply(settings: ClusterRouterGroupSettings): Option[(Int, immutable.Seq[String], Boolean, Set[String])] =
|
||||
Some((settings.totalInstances, settings.routeesPaths, settings.allowLocalRoutees, settings.useRoles))
|
||||
}
|
||||
|
||||
/**
|
||||
* `totalInstances` of cluster router must be > 0
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
final case class ClusterRouterGroupSettings(
|
||||
totalInstances: Int,
|
||||
routeesPaths: immutable.Seq[String],
|
||||
allowLocalRoutees: Boolean,
|
||||
useRoles: Set[String])
|
||||
extends ClusterRouterSettingsBase {
|
||||
final class ClusterRouterGroupSettings(
|
||||
val totalInstances: Int,
|
||||
val routeesPaths: immutable.Seq[String],
|
||||
val allowLocalRoutees: Boolean,
|
||||
val useRoles: Set[String])
|
||||
extends Product
|
||||
with Serializable
|
||||
with ClusterRouterSettingsBase {
|
||||
|
||||
override def hashCode(): Int = {
|
||||
var seed = HashCode.SEED
|
||||
seed = HashCode.hash(seed, totalInstances)
|
||||
seed = HashCode.hash(seed, routeesPaths)
|
||||
seed = HashCode.hash(seed, allowLocalRoutees)
|
||||
seed = HashCode.hash(seed, useRoles)
|
||||
seed
|
||||
}
|
||||
|
||||
override def canEqual(that: Any): Boolean = that.isInstanceOf[ClusterRouterGroupSettings]
|
||||
override def productArity: Int = 4
|
||||
override def productElement(n: Int): Any = n match {
|
||||
case 0 => totalInstances
|
||||
case 1 => routeesPaths
|
||||
case 2 => allowLocalRoutees
|
||||
case 3 => useRoles
|
||||
}
|
||||
|
||||
override def equals(obj: Any): Boolean =
|
||||
obj match {
|
||||
case that: ClusterRouterGroupSettings =>
|
||||
this.totalInstances.equals(that.totalInstances) &&
|
||||
this.routeesPaths.equals(that.routeesPaths) &&
|
||||
this.allowLocalRoutees == that.allowLocalRoutees &&
|
||||
this.useRoles.equals(that.useRoles)
|
||||
case _ => false
|
||||
}
|
||||
|
||||
override def toString: String =
|
||||
s"ClusterRouterGroupSettings($totalInstances,$routeesPaths,$allowLocalRoutees,$useRoles)"
|
||||
|
||||
// For binary compatibility
|
||||
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
|
||||
|
|
@ -135,13 +176,21 @@ final case class ClusterRouterGroupSettings(
|
|||
}
|
||||
|
||||
object ClusterRouterPoolSettings {
|
||||
|
||||
def apply(
|
||||
totalInstances: Int,
|
||||
maxInstancesPerNode: Int,
|
||||
allowLocalRoutees: Boolean,
|
||||
useRoles: Set[String]): ClusterRouterPoolSettings =
|
||||
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles)
|
||||
|
||||
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
|
||||
def apply(
|
||||
totalInstances: Int,
|
||||
maxInstancesPerNode: Int,
|
||||
allowLocalRoutees: Boolean,
|
||||
useRole: Option[String]): ClusterRouterPoolSettings =
|
||||
ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet)
|
||||
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet)
|
||||
|
||||
@varargs
|
||||
def apply(
|
||||
|
|
@ -149,16 +198,19 @@ object ClusterRouterPoolSettings {
|
|||
maxInstancesPerNode: Int,
|
||||
allowLocalRoutees: Boolean,
|
||||
useRoles: String*): ClusterRouterPoolSettings =
|
||||
ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet)
|
||||
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet)
|
||||
|
||||
// For backwards compatibility, useRoles is the combination of use-roles and use-role
|
||||
def fromConfig(config: Config): ClusterRouterPoolSettings =
|
||||
ClusterRouterPoolSettings(
|
||||
new ClusterRouterPoolSettings(
|
||||
totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config),
|
||||
maxInstancesPerNode = config.getInt("cluster.max-nr-of-instances-per-node"),
|
||||
allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"),
|
||||
useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption(
|
||||
config.getString("cluster.use-role")))
|
||||
|
||||
def unapply(settings: ClusterRouterPoolSettings): Option[(Int, Int, Boolean, Set[String])] =
|
||||
Some((settings.totalInstances, settings.maxInstancesPerNode, settings.allowLocalRoutees, settings.useRoles))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -167,12 +219,45 @@ object ClusterRouterPoolSettings {
|
|||
* `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
final case class ClusterRouterPoolSettings(
|
||||
totalInstances: Int,
|
||||
maxInstancesPerNode: Int,
|
||||
allowLocalRoutees: Boolean,
|
||||
useRoles: Set[String])
|
||||
extends ClusterRouterSettingsBase {
|
||||
final class ClusterRouterPoolSettings(
|
||||
val totalInstances: Int,
|
||||
val maxInstancesPerNode: Int,
|
||||
val allowLocalRoutees: Boolean,
|
||||
val useRoles: Set[String])
|
||||
extends Product
|
||||
with Serializable
|
||||
with ClusterRouterSettingsBase {
|
||||
|
||||
override def hashCode(): Int = {
|
||||
var seed = HashCode.SEED
|
||||
seed = HashCode.hash(seed, totalInstances)
|
||||
seed = HashCode.hash(seed, maxInstancesPerNode)
|
||||
seed = HashCode.hash(seed, allowLocalRoutees)
|
||||
seed = HashCode.hash(seed, useRoles)
|
||||
seed
|
||||
}
|
||||
|
||||
override def canEqual(that: Any): Boolean = that.isInstanceOf[ClusterRouterPoolSettings]
|
||||
override def productArity: Int = 4
|
||||
override def productElement(n: Int): Any = n match {
|
||||
case 0 => totalInstances
|
||||
case 1 => maxInstancesPerNode
|
||||
case 2 => allowLocalRoutees
|
||||
case 3 => useRoles
|
||||
}
|
||||
|
||||
override def equals(obj: Any): Boolean =
|
||||
obj match {
|
||||
case that: ClusterRouterPoolSettings =>
|
||||
this.totalInstances.equals(that.totalInstances) &&
|
||||
this.maxInstancesPerNode.equals(that.maxInstancesPerNode) &&
|
||||
this.allowLocalRoutees == that.allowLocalRoutees &&
|
||||
this.useRoles.equals(that.useRoles)
|
||||
case _ => false
|
||||
}
|
||||
|
||||
override def toString: String =
|
||||
s"ClusterRouterPoolSettings($totalInstances,$maxInstancesPerNode,$allowLocalRoutees,$useRoles)"
|
||||
|
||||
// For binary compatibility
|
||||
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
|
||||
|
|
|
|||
|
|
@ -374,7 +374,7 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent
|
|||
log.error(t, "SBR acquire of lease failed")
|
||||
false
|
||||
}
|
||||
.map(AcquireLeaseResult)
|
||||
.map(AcquireLeaseResult.apply)
|
||||
.pipeTo(self))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import scala.concurrent.duration._
|
|||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSelection
|
||||
import akka.annotation.InternalApi
|
||||
import akka.remote.testconductor.RoleName
|
||||
|
|
@ -112,7 +113,7 @@ abstract class MultiDcHeartbeatTakingOverSpec
|
|||
}
|
||||
|
||||
"be healthy" taggedAs LongRunningTest in within(5.seconds) {
|
||||
implicit val sender = observer.ref
|
||||
implicit val sender: ActorRef = observer.ref
|
||||
runOn(expectedAlphaHeartbeaterRoles.toList: _*) {
|
||||
awaitAssert {
|
||||
selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
|
||||
|
|
@ -156,7 +157,7 @@ abstract class MultiDcHeartbeatTakingOverSpec
|
|||
|
||||
enterBarrier("after-alpha-monitoring-node-left")
|
||||
|
||||
implicit val sender = observer.ref
|
||||
implicit val sender: ActorRef = observer.ref
|
||||
val expectedAlphaMonitoringNodesAfterLeaving =
|
||||
(takeNOldestMembers(dataCenter = "alpha", 3).filterNot(_.status == MemberStatus.Exiting))
|
||||
runOn(membersAsRoles(expectedAlphaMonitoringNodesAfterLeaving).toList: _*) {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import scala.concurrent.duration._
|
|||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.annotation.InternalApi
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
|
||||
|
|
@ -94,7 +95,7 @@ abstract class MultiDcSunnyWeatherSpec
|
|||
expectedAlphaHeartbeaterRoles.size should ===(2)
|
||||
expectedBetaHeartbeaterRoles.size should ===(2)
|
||||
|
||||
implicit val sender = observer.ref
|
||||
implicit val sender: ActorRef = observer.ref
|
||||
runOn(expectedAlphaHeartbeaterRoles.toList: _*) {
|
||||
selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
|
||||
observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds)
|
||||
|
|
@ -120,7 +121,7 @@ abstract class MultiDcSunnyWeatherSpec
|
|||
|
||||
enterBarrier("checking-activeReceivers")
|
||||
|
||||
implicit val sender = observer.ref
|
||||
implicit val sender: ActorRef = observer.ref
|
||||
selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus()
|
||||
observer.expectMsgType[CrossDcHeartbeatSender.MonitoringStateReport](5.seconds) match {
|
||||
case CrossDcHeartbeatSender.MonitoringDormant() => // ok ...
|
||||
|
|
|
|||
|
|
@ -127,12 +127,12 @@ class ClusterDomainEventSpec extends AnyWordSpec with Matchers with BeforeAndAft
|
|||
diffUnreachableDataCenter(
|
||||
MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5),
|
||||
MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===(
|
||||
otherDc.map(UnreachableDataCenter))
|
||||
otherDc.map(UnreachableDataCenter.apply))
|
||||
|
||||
diffReachableDataCenter(
|
||||
MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5),
|
||||
MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===(
|
||||
otherDc.map(ReachableDataCenter))
|
||||
otherDc.map(ReachableDataCenter.apply))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -620,7 +620,7 @@ class JoinConfigCompatCheckerSpec extends AkkaSpec with ClusterTestKit {
|
|||
val clusterTestUtil = new ClusterTestUtil(system.name)
|
||||
try {
|
||||
val sys = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(configWithChecker))
|
||||
Cluster(sys).settings.ConfigCompatCheckers should ===(Set.empty)
|
||||
Cluster(sys).settings.ConfigCompatCheckers should ===(Set.empty[String])
|
||||
} finally {
|
||||
clusterTestUtil.shutdownAll()
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue