diff --git a/.github/workflows/scala3-build.yml b/.github/workflows/scala3-build.yml index ba8f1ea464..bec302ccc5 100644 --- a/.github/workflows/scala3-build.yml +++ b/.github/workflows/scala3-build.yml @@ -34,6 +34,7 @@ jobs: akka-actor-testkit-typed/test \ akka-actor-typed/compile \ akka-actor-typed-tests/test \ + akka-cluster/Test/compile \ akka-coordination/test \ akka-discovery/test \ akka-pki/test \ diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala index bb63dfb922..be7c3229f7 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala @@ -49,7 +49,7 @@ private[metrics] object MetricsCollector { /** Try to create collector instance in the order of priority. */ def apply(system: ActorSystem): MetricsCollector = { - val log = Logging(system, getClass) + val log = Logging(system, classOf[MetricsCollector]) val settings = ClusterMetricsSettings(system.settings.config) import settings._ diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java index 0098da699d..f324f25382 100644 --- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java +++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java @@ -15569,7 +15569,7 @@ public final class ClusterMessages { * * Protobuf type {@code VectorClock} */ - public static final class VectorClock extends + public static final class VectorClock extends akka.protobufv3.internal.GeneratedMessageV3 implements // @@protoc_insertion_point(message_implements:VectorClock) VectorClockOrBuilder { @@ -15691,7 +15691,7 @@ public final class ClusterMessages { /** * Protobuf type {@code VectorClock.Version} */ - public static final class Version extends + public static final class Version extends akka.protobufv3.internal.GeneratedMessageV3 implements // @@protoc_insertion_point(message_implements:VectorClock.Version) VersionOrBuilder { @@ -15977,23 +15977,23 @@ public final class ClusterMessages { } @java.lang.Override - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder() { + public Version.Builder newBuilderForType() { return newBuilder(); } + public static Version.Builder newBuilder() { return DEFAULT_INSTANCE.toBuilder(); } public static Builder newBuilder(akka.cluster.protobuf.msg.ClusterMessages.VectorClock.Version prototype) { return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); } @java.lang.Override - public Builder toBuilder() { + public Version.Builder toBuilder() { return this == DEFAULT_INSTANCE - ? new Builder() : new Builder().mergeFrom(this); + ? new Version.Builder() : new Version.Builder().mergeFrom(this); } @java.lang.Override - protected Builder newBuilderForType( + protected Version.Builder newBuilderForType( akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { - Builder builder = new Builder(parent); + Version.Builder builder = new Version.Builder(parent); return builder; } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 235b9791e0..6a874c6f33 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -522,7 +522,7 @@ object ClusterEvent { val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState)) val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState)) - currentUnreachableDcs.diff(oldUnreachableDcs).iterator.map(UnreachableDataCenter).to(immutable.IndexedSeq) + currentUnreachableDcs.diff(oldUnreachableDcs).iterator.map(UnreachableDataCenter.apply).to(immutable.IndexedSeq) } } @@ -541,7 +541,7 @@ object ClusterEvent { val oldUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(oldState)) val currentUnreachableDcs = otherDcs.filterNot(isDataCenterReachable(newState)) - oldUnreachableDcs.diff(currentUnreachableDcs).iterator.map(ReachableDataCenter).to(immutable.IndexedSeq) + oldUnreachableDcs.diff(currentUnreachableDcs).iterator.map(ReachableDataCenter.apply).to(immutable.IndexedSeq) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 12f8edf7b2..bea274a3a9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -55,7 +55,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { private val eventBusListener: ActorRef = { cluster.system .systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { - override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) + override def preStart(): Unit = cluster.subscribe(this.self, classOf[ClusterDomainEvent]) def receive: Receive = { case e: ClusterDomainEvent if !_closed => diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index e7d346abb7..47b2120e49 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -305,7 +305,6 @@ object MemberStatus { } object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] { - // for binary compatibility @deprecated("Use Long UID apply instead", since = "2.4.11") def apply(address: Address, uid: Int) = new UniqueAddress(address, uid.toLong) @@ -313,6 +312,9 @@ object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] { def apply(remoteUniqueAddress: akka.remote.UniqueAddress): UniqueAddress = new UniqueAddress(remoteUniqueAddress.address, remoteUniqueAddress.uid) + def apply(address: Address, longUid: Long) = new UniqueAddress(address, longUid) + + def unapply(address: UniqueAddress): Option[(Address, Long)] = Some((address.address, address.longUid)) } /** @@ -321,10 +323,28 @@ object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] { * incarnations of a member with same hostname and port. */ @SerialVersionUID(1L) -final case class UniqueAddress(address: Address, longUid: Long) extends Ordered[UniqueAddress] { +final class UniqueAddress(val address: Address, val longUid: Long) + extends Product + with Serializable + with Ordered[UniqueAddress] { override def hashCode = java.lang.Long.hashCode(longUid) + override def productArity: Int = 2 + override def productElement(n: Int): Any = n match { + case 0 => address + case 1 => longUid + } + override def canEqual(that: Any): Boolean = that.isInstanceOf[UniqueAddress] + + override def equals(obj: Any): Boolean = + obj match { + case ua: UniqueAddress => this.address.equals(ua.address) && this.longUid.equals(ua.longUid) + case _ => false + } + + override def toString = s"UniqueAddress($address,$longUid)" + def compare(that: UniqueAddress): Int = { val result = Member.addressOrdering.compare(this.address, that.address) if (result == 0) if (this.longUid < that.longUid) -1 else if (this.longUid == that.longUid) 0 else 1 diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 54ff70757f..4c5bb9acf6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -5,14 +5,11 @@ package akka.cluster.routing import java.util.concurrent.atomic.AtomicInteger - import scala.annotation.{ tailrec, varargs } import scala.collection.immutable - import scala.annotation.nowarn import com.typesafe.config.Config import com.typesafe.config.ConfigFactory - import akka.actor._ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ @@ -31,6 +28,7 @@ import akka.routing.RouterActor import akka.routing.RouterConfig import akka.routing.RouterPoolActor import akka.routing.RoutingLogic +import akka.util.HashCode import akka.util.ccompat.JavaConverters._ object ClusterRouterGroupSettings { @@ -40,7 +38,7 @@ object ClusterRouterGroupSettings { routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterGroupSettings = - ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet) + new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet) @varargs def apply( @@ -48,28 +46,71 @@ object ClusterRouterGroupSettings { routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRoles: String*): ClusterRouterGroupSettings = - ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet) + new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet) // For backwards compatibility, useRoles is the combination of use-roles and use-role def fromConfig(config: Config): ClusterRouterGroupSettings = - ClusterRouterGroupSettings( + new ClusterRouterGroupSettings( totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config), routeesPaths = immutableSeq(config.getStringList("routees.paths")), allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"), useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption( config.getString("cluster.use-role"))) + + def apply( + totalInstances: Int, + routeesPaths: immutable.Seq[String], + allowLocalRoutees: Boolean, + useRoles: Set[String]): ClusterRouterGroupSettings = + new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles) + + def unapply(settings: ClusterRouterGroupSettings): Option[(Int, immutable.Seq[String], Boolean, Set[String])] = + Some((settings.totalInstances, settings.routeesPaths, settings.allowLocalRoutees, settings.useRoles)) } /** * `totalInstances` of cluster router must be > 0 */ @SerialVersionUID(1L) -final case class ClusterRouterGroupSettings( - totalInstances: Int, - routeesPaths: immutable.Seq[String], - allowLocalRoutees: Boolean, - useRoles: Set[String]) - extends ClusterRouterSettingsBase { +final class ClusterRouterGroupSettings( + val totalInstances: Int, + val routeesPaths: immutable.Seq[String], + val allowLocalRoutees: Boolean, + val useRoles: Set[String]) + extends Product + with Serializable + with ClusterRouterSettingsBase { + + override def hashCode(): Int = { + var seed = HashCode.SEED + seed = HashCode.hash(seed, totalInstances) + seed = HashCode.hash(seed, routeesPaths) + seed = HashCode.hash(seed, allowLocalRoutees) + seed = HashCode.hash(seed, useRoles) + seed + } + + override def canEqual(that: Any): Boolean = that.isInstanceOf[ClusterRouterGroupSettings] + override def productArity: Int = 4 + override def productElement(n: Int): Any = n match { + case 0 => totalInstances + case 1 => routeesPaths + case 2 => allowLocalRoutees + case 3 => useRoles + } + + override def equals(obj: Any): Boolean = + obj match { + case that: ClusterRouterGroupSettings => + this.totalInstances.equals(that.totalInstances) && + this.routeesPaths.equals(that.routeesPaths) && + this.allowLocalRoutees == that.allowLocalRoutees && + this.useRoles.equals(that.useRoles) + case _ => false + } + + override def toString: String = + s"ClusterRouterGroupSettings($totalInstances,$routeesPaths,$allowLocalRoutees,$useRoles)" // For binary compatibility @deprecated("useRole has been replaced with useRoles", since = "2.5.4") @@ -135,13 +176,21 @@ final case class ClusterRouterGroupSettings( } object ClusterRouterPoolSettings { + + def apply( + totalInstances: Int, + maxInstancesPerNode: Int, + allowLocalRoutees: Boolean, + useRoles: Set[String]): ClusterRouterPoolSettings = + new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles) + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") def apply( totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterPoolSettings = - ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet) + new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet) @varargs def apply( @@ -149,16 +198,19 @@ object ClusterRouterPoolSettings { maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRoles: String*): ClusterRouterPoolSettings = - ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet) + new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet) // For backwards compatibility, useRoles is the combination of use-roles and use-role def fromConfig(config: Config): ClusterRouterPoolSettings = - ClusterRouterPoolSettings( + new ClusterRouterPoolSettings( totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config), maxInstancesPerNode = config.getInt("cluster.max-nr-of-instances-per-node"), allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"), useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption( config.getString("cluster.use-role"))) + + def unapply(settings: ClusterRouterPoolSettings): Option[(Int, Int, Boolean, Set[String])] = + Some((settings.totalInstances, settings.maxInstancesPerNode, settings.allowLocalRoutees, settings.useRoles)) } /** @@ -167,12 +219,45 @@ object ClusterRouterPoolSettings { * `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined */ @SerialVersionUID(1L) -final case class ClusterRouterPoolSettings( - totalInstances: Int, - maxInstancesPerNode: Int, - allowLocalRoutees: Boolean, - useRoles: Set[String]) - extends ClusterRouterSettingsBase { +final class ClusterRouterPoolSettings( + val totalInstances: Int, + val maxInstancesPerNode: Int, + val allowLocalRoutees: Boolean, + val useRoles: Set[String]) + extends Product + with Serializable + with ClusterRouterSettingsBase { + + override def hashCode(): Int = { + var seed = HashCode.SEED + seed = HashCode.hash(seed, totalInstances) + seed = HashCode.hash(seed, maxInstancesPerNode) + seed = HashCode.hash(seed, allowLocalRoutees) + seed = HashCode.hash(seed, useRoles) + seed + } + + override def canEqual(that: Any): Boolean = that.isInstanceOf[ClusterRouterPoolSettings] + override def productArity: Int = 4 + override def productElement(n: Int): Any = n match { + case 0 => totalInstances + case 1 => maxInstancesPerNode + case 2 => allowLocalRoutees + case 3 => useRoles + } + + override def equals(obj: Any): Boolean = + obj match { + case that: ClusterRouterPoolSettings => + this.totalInstances.equals(that.totalInstances) && + this.maxInstancesPerNode.equals(that.maxInstancesPerNode) && + this.allowLocalRoutees == that.allowLocalRoutees && + this.useRoles.equals(that.useRoles) + case _ => false + } + + override def toString: String = + s"ClusterRouterPoolSettings($totalInstances,$maxInstancesPerNode,$allowLocalRoutees,$useRoles)" // For binary compatibility @deprecated("useRole has been replaced with useRoles", since = "2.5.4") diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala index f4a6fd78f0..eec0a3289f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala @@ -374,7 +374,7 @@ import akka.remote.artery.ThisActorSystemQuarantinedEvent log.error(t, "SBR acquire of lease failed") false } - .map(AcquireLeaseResult) + .map(AcquireLeaseResult.apply) .pipeTo(self)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala index 41a4fa6bf0..13d837f7a0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory +import akka.actor.ActorRef import akka.actor.ActorSelection import akka.annotation.InternalApi import akka.remote.testconductor.RoleName @@ -112,7 +113,7 @@ abstract class MultiDcHeartbeatTakingOverSpec } "be healthy" taggedAs LongRunningTest in within(5.seconds) { - implicit val sender = observer.ref + implicit val sender: ActorRef = observer.ref runOn(expectedAlphaHeartbeaterRoles.toList: _*) { awaitAssert { selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus() @@ -156,7 +157,7 @@ abstract class MultiDcHeartbeatTakingOverSpec enterBarrier("after-alpha-monitoring-node-left") - implicit val sender = observer.ref + implicit val sender: ActorRef = observer.ref val expectedAlphaMonitoringNodesAfterLeaving = (takeNOldestMembers(dataCenter = "alpha", 3).filterNot(_.status == MemberStatus.Exiting)) runOn(membersAsRoles(expectedAlphaMonitoringNodesAfterLeaving).toList: _*) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala index 4d352d460a..4ca7f73dc1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory +import akka.actor.ActorRef import akka.annotation.InternalApi import akka.remote.testconductor.RoleName import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } @@ -94,7 +95,7 @@ abstract class MultiDcSunnyWeatherSpec expectedAlphaHeartbeaterRoles.size should ===(2) expectedBetaHeartbeaterRoles.size should ===(2) - implicit val sender = observer.ref + implicit val sender: ActorRef = observer.ref runOn(expectedAlphaHeartbeaterRoles.toList: _*) { selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus() observer.expectMsgType[CrossDcHeartbeatSender.MonitoringActive](5.seconds) @@ -120,7 +121,7 @@ abstract class MultiDcSunnyWeatherSpec enterBarrier("checking-activeReceivers") - implicit val sender = observer.ref + implicit val sender: ActorRef = observer.ref selectCrossDcHeartbeatSender ! CrossDcHeartbeatSender.ReportStatus() observer.expectMsgType[CrossDcHeartbeatSender.MonitoringStateReport](5.seconds) match { case CrossDcHeartbeatSender.MonitoringDormant() => // ok ... diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 54d37d9ded..6aedcbc34d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -127,12 +127,12 @@ class ClusterDomainEventSpec extends AnyWordSpec with Matchers with BeforeAndAft diffUnreachableDataCenter( MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5), MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===( - otherDc.map(UnreachableDataCenter)) + otherDc.map(UnreachableDataCenter.apply)) diffReachableDataCenter( MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5), MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===( - otherDc.map(ReachableDataCenter)) + otherDc.map(ReachableDataCenter.apply)) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckerSpec.scala index 4e81ecfd26..0965a6df4e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/JoinConfigCompatCheckerSpec.scala @@ -620,7 +620,7 @@ class JoinConfigCompatCheckerSpec extends AkkaSpec with ClusterTestKit { val clusterTestUtil = new ClusterTestUtil(system.name) try { val sys = clusterTestUtil.newActorSystem(joinNodeConfig.withFallback(configWithChecker)) - Cluster(sys).settings.ConfigCompatCheckers should ===(Set.empty) + Cluster(sys).settings.ConfigCompatCheckers should ===(Set.empty[String]) } finally { clusterTestUtil.shutdownAll() }