diff --git a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala index dee09b0def..cc087a8182 100644 --- a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala +++ b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsRoutingSpec.scala @@ -138,7 +138,7 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa val router = system.actorOf( ClusterRouterPool( local = AdaptiveLoadBalancingPool(HeapMetricsSelector), - settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). + settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true)). props(Props[Echo]), name) // it may take some time until router receives cluster member events diff --git a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala index d53d24d7c0..6b7891ee74 100644 --- a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala +++ b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala @@ -45,7 +45,7 @@ object StatsSampleSpecConfig extends MultiNodeConfig { cluster { enabled = on allow-local-routees = on - use-role = compute + use-roles = ["compute"] } } } diff --git a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsService.scala b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsService.scala index 6fd82f4e07..3fd57f14f5 100644 --- a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsService.scala +++ b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsService.scala @@ -57,7 +57,7 @@ abstract class StatsService2 extends Actor { val workerRouter = context.actorOf( ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings( totalInstances = 100, routeesPaths = List("/user/statsWorker"), - allowLocalRoutees = true, useRole = Some("compute"))).props(), + allowLocalRoutees = true, useRoles = Set("compute"))).props(), name = "workerRouter2") //#router-lookup-in-code } @@ -71,7 +71,7 @@ abstract class StatsService3 extends Actor { val workerRouter = context.actorOf( ClusterRouterPool(ConsistentHashingPool(0), ClusterRouterPoolSettings( totalInstances = 100, maxInstancesPerNode = 3, - allowLocalRoutees = false, useRole = None)).props(Props[StatsWorker]), + allowLocalRoutees = false)).props(Props[StatsWorker]), name = "workerRouter3") //#router-deploy-in-code } diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java index 9e8b1ce958..577f9c8ad6 100644 --- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java +++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java @@ -13149,6 +13149,26 @@ public final class ClusterMessages { */ akka.protobuf.ByteString getUseRoleBytes(); + + // repeated string useRoles = 5; + /** + * repeated string useRoles = 5; + */ + java.util.List + getUseRolesList(); + /** + * repeated string useRoles = 5; + */ + int getUseRolesCount(); + /** + * repeated string useRoles = 5; + */ + java.lang.String getUseRoles(int index); + /** + * repeated string useRoles = 5; + */ + akka.protobuf.ByteString + getUseRolesBytes(int index); } /** * Protobuf type {@code ClusterRouterPoolSettings} @@ -13221,6 +13241,14 @@ public final class ClusterMessages { useRole_ = input.readBytes(); break; } + case 42: { + if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + useRoles_ = new akka.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000010; + } + useRoles_.add(input.readBytes()); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -13229,6 +13257,9 @@ public final class ClusterMessages { throw new akka.protobuf.InvalidProtocolBufferException( e.getMessage()).setUnfinishedMessage(this); } finally { + if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + useRoles_ = new akka.protobuf.UnmodifiableLazyStringList(useRoles_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -13352,11 +13383,42 @@ public final class ClusterMessages { } } + // repeated string useRoles = 5; + public static final int USEROLES_FIELD_NUMBER = 5; + private akka.protobuf.LazyStringList useRoles_; + /** + * repeated string useRoles = 5; + */ + public java.util.List + getUseRolesList() { + return useRoles_; + } + /** + * repeated string useRoles = 5; + */ + public int getUseRolesCount() { + return useRoles_.size(); + } + /** + * repeated string useRoles = 5; + */ + public java.lang.String getUseRoles(int index) { + return useRoles_.get(index); + } + /** + * repeated string useRoles = 5; + */ + public akka.protobuf.ByteString + getUseRolesBytes(int index) { + return useRoles_.getByteString(index); + } + private void initFields() { totalInstances_ = 0; maxInstancesPerNode_ = 0; allowLocalRoutees_ = false; useRole_ = ""; + useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13394,6 +13456,9 @@ public final class ClusterMessages { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(4, getUseRoleBytes()); } + for (int i = 0; i < useRoles_.size(); i++) { + output.writeBytes(5, useRoles_.getByteString(i)); + } getUnknownFields().writeTo(output); } @@ -13419,6 +13484,15 @@ public final class ClusterMessages { size += akka.protobuf.CodedOutputStream .computeBytesSize(4, getUseRoleBytes()); } + { + int dataSize = 0; + for (int i = 0; i < useRoles_.size(); i++) { + dataSize += akka.protobuf.CodedOutputStream + .computeBytesSizeNoTag(useRoles_.getByteString(i)); + } + size += dataSize; + size += 1 * getUseRolesList().size(); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -13543,6 +13617,8 @@ public final class ClusterMessages { bitField0_ = (bitField0_ & ~0x00000004); useRole_ = ""; bitField0_ = (bitField0_ & ~0x00000008); + useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -13587,6 +13663,12 @@ public final class ClusterMessages { to_bitField0_ |= 0x00000008; } result.useRole_ = useRole_; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + useRoles_ = new akka.protobuf.UnmodifiableLazyStringList( + useRoles_); + bitField0_ = (bitField0_ & ~0x00000010); + } + result.useRoles_ = useRoles_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -13617,6 +13699,16 @@ public final class ClusterMessages { useRole_ = other.useRole_; onChanged(); } + if (!other.useRoles_.isEmpty()) { + if (useRoles_.isEmpty()) { + useRoles_ = other.useRoles_; + bitField0_ = (bitField0_ & ~0x00000010); + } else { + ensureUseRolesIsMutable(); + useRoles_.addAll(other.useRoles_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -13829,6 +13921,99 @@ public final class ClusterMessages { return this; } + // repeated string useRoles = 5; + private akka.protobuf.LazyStringList useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY; + private void ensureUseRolesIsMutable() { + if (!((bitField0_ & 0x00000010) == 0x00000010)) { + useRoles_ = new akka.protobuf.LazyStringArrayList(useRoles_); + bitField0_ |= 0x00000010; + } + } + /** + * repeated string useRoles = 5; + */ + public java.util.List + getUseRolesList() { + return java.util.Collections.unmodifiableList(useRoles_); + } + /** + * repeated string useRoles = 5; + */ + public int getUseRolesCount() { + return useRoles_.size(); + } + /** + * repeated string useRoles = 5; + */ + public java.lang.String getUseRoles(int index) { + return useRoles_.get(index); + } + /** + * repeated string useRoles = 5; + */ + public akka.protobuf.ByteString + getUseRolesBytes(int index) { + return useRoles_.getByteString(index); + } + /** + * repeated string useRoles = 5; + */ + public Builder setUseRoles( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureUseRolesIsMutable(); + useRoles_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string useRoles = 5; + */ + public Builder addUseRoles( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureUseRolesIsMutable(); + useRoles_.add(value); + onChanged(); + return this; + } + /** + * repeated string useRoles = 5; + */ + public Builder addAllUseRoles( + java.lang.Iterable values) { + ensureUseRolesIsMutable(); + super.addAll(values, useRoles_); + onChanged(); + return this; + } + /** + * repeated string useRoles = 5; + */ + public Builder clearUseRoles() { + useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + onChanged(); + return this; + } + /** + * repeated string useRoles = 5; + */ + public Builder addUseRolesBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureUseRolesIsMutable(); + useRoles_.add(value); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ClusterRouterPoolSettings) } @@ -13967,15 +14152,16 @@ public final class ClusterMessages { "\001(\r\"V\n\021ClusterRouterPool\022\023\n\004pool\030\001 \002(\0132\005" + ".Pool\022,\n\010settings\030\002 \002(\0132\032.ClusterRouterP" + "oolSettings\"<\n\004Pool\022\024\n\014serializerId\030\001 \002(" + - "\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"|\n\031Clu" + - "sterRouterPoolSettings\022\026\n\016totalInstances" + - "\030\001 \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(\r\022\031\n\021a" + - "llowLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030\004 \001(\t*" + - "D\n\022ReachabilityStatus\022\r\n\tReachable\020\000\022\017\n\013" + - "Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014MemberS" + - "tatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022", - "\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010W" + - "eaklyUp\020\006B\035\n\031akka.cluster.protobuf.msgH\001" + "\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"\216\001\n\031Cl" + + "usterRouterPoolSettings\022\026\n\016totalInstance" + + "s\030\001 \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(\r\022\031\n\021" + + "allowLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030\004 \001(\t" + + "\022\020\n\010useRoles\030\005 \003(\t*D\n\022ReachabilityStatus" + + "\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022\016\n\nTerm" + + "inated\020\002*b\n\014MemberStatus\022\013\n\007Joining\020\000\022\006\n", + "\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020" + + "\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031akka.clu" + + "ster.protobuf.msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -14083,7 +14269,7 @@ public final class ClusterMessages { internal_static_ClusterRouterPoolSettings_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ClusterRouterPoolSettings_descriptor, - new java.lang.String[] { "TotalInstances", "MaxInstancesPerNode", "AllowLocalRoutees", "UseRole", }); + new java.lang.String[] { "TotalInstances", "MaxInstancesPerNode", "AllowLocalRoutees", "UseRole", "UseRoles", }); return null; } }; diff --git a/akka-cluster/src/main/mima-filters/2.5.4.backwards.excludes b/akka-cluster/src/main/mima-filters/2.5.4.backwards.excludes new file mode 100644 index 0000000000..51edebc818 --- /dev/null +++ b/akka-cluster/src/main/mima-filters/2.5.4.backwards.excludes @@ -0,0 +1,7 @@ +# #23257 replace ClusterRouterGroup/Pool "use-role" with "use-roles" +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRoles") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRolesBytes") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRolesCount") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRolesList") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.routing.ClusterRouterSettingsBase.useRole") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.routing.ClusterRouterSettingsBase.useRoles") \ No newline at end of file diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index fa4358dfac..b331dafd82 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -223,4 +223,5 @@ message UniqueAddress { required uint32 maxInstancesPerNode = 2; required bool allowLocalRoutees = 3; optional string useRole = 4; + repeated string useRoles = 5; } \ No newline at end of file diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index d53aece519..606ad1c54b 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -234,9 +234,12 @@ akka { # Useful for master-worker scenario where all routees are remote. allow-local-routees = on + # Use members with all specified roles, or all members if undefined or empty. + use-roles = [] + + # Deprecated, since Akka 2.5.4, replaced by use-roles # Use members with specified role, or all members if undefined or empty. use-role = "" - } # Protobuf serializer for cluster messages diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 23c8834001..06a8bf853a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -13,8 +13,8 @@ import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWi import akka.protobuf.{ ByteString, MessageLite } import scala.annotation.tailrec -import scala.collection.JavaConverters._ import scala.collection.immutable +import scala.collection.JavaConverters._ import scala.concurrent.duration.Deadline import java.io.NotSerializableException @@ -166,8 +166,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri builder.setAllowLocalRoutees(settings.allowLocalRoutees) .setMaxInstancesPerNode(settings.maxInstancesPerNode) .setTotalInstances(settings.totalInstances) + .addAllUseRoles(settings.useRoles.asJava) + // for backwards compatibility settings.useRole.foreach(builder.setUseRole) + builder.build() } @@ -378,11 +381,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri } private def clusterRouterPoolSettingsFromProto(crps: cm.ClusterRouterPoolSettings): ClusterRouterPoolSettings = { + // For backwards compatibility, useRoles is the combination of getUseRole and getUseRolesList ClusterRouterPoolSettings( totalInstances = crps.getTotalInstances, maxInstancesPerNode = crps.getMaxInstancesPerNode, allowLocalRoutees = crps.getAllowLocalRoutees, - useRole = if (crps.hasUseRole) Some(crps.getUseRole) else None + useRoles = if (crps.hasUseRole) { crps.getUseRolesList.asScala.toSet + crps.getUseRole } else { crps.getUseRolesList.asScala.toSet } ) } diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index a1697d39a2..cc707ffbfb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -26,16 +26,26 @@ import akka.routing.RoutingLogic import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import scala.annotation.tailrec +import scala.annotation.{ tailrec, varargs } import scala.collection.immutable +import scala.collection.JavaConverters._ object ClusterRouterGroupSettings { + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") + def apply(totalInstances: Int, routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterGroupSettings = + ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet) + + @varargs + def apply(totalInstances: Int, routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRoles: String*): ClusterRouterGroupSettings = + ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet) + + // For backwards compatibility, useRoles is the combination of use-roles and use-role def fromConfig(config: Config): ClusterRouterGroupSettings = ClusterRouterGroupSettings( totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config), routeesPaths = immutableSeq(config.getStringList("routees.paths")), allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"), - useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role"))) + useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role"))) } /** @@ -46,33 +56,71 @@ final case class ClusterRouterGroupSettings( totalInstances: Int, routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, - useRole: Option[String]) extends ClusterRouterSettingsBase { + useRoles: Set[String]) extends ClusterRouterSettingsBase { + + // For binary compatibility + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") + def useRole: Option[String] = useRoles.headOption + + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") + def this(totalInstances: Int, routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRole: Option[String]) = + this(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet) /** * Java API */ + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRole: String) = - this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole)) + this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, Option(useRole).toSet) + + /** + * Java API + */ + def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRoles: java.util.Set[String]) = + this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, useRoles.asScala.toSet) + + // For binary compatibility + @deprecated("Use constructor with useRoles instead", since = "2.5.4") + def copy(totalInstances: Int = totalInstances, routeesPaths: immutable.Seq[String] = routeesPaths, allowLocalRoutees: Boolean = allowLocalRoutees, useRole: Option[String] = useRole): ClusterRouterGroupSettings = + new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole) if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0") if ((routeesPaths eq null) || routeesPaths.isEmpty || routeesPaths.head == "") throw new IllegalArgumentException("routeesPaths must be defined") - routeesPaths.foreach(p ⇒ p match { + routeesPaths.foreach { case RelativeActorPath(elements) ⇒ // good - case _ ⇒ + case p ⇒ throw new IllegalArgumentException(s"routeesPaths [$p] is not a valid actor path without address information") - }) + } + def withUseRoles(useRoles: Set[String]): ClusterRouterGroupSettings = new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles) + + @varargs + def withUseRoles(useRoles: String*): ClusterRouterGroupSettings = new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet) + + /** + * Java API + */ + def withUseRoles(useRoles: java.util.Set[String]): ClusterRouterGroupSettings = new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.asScala.toSet) } object ClusterRouterPoolSettings { + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") + def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterPoolSettings = + ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet) + + @varargs + def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRoles: String*): ClusterRouterPoolSettings = + ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet) + + // For backwards compatibility, useRoles is the combination of use-roles and use-role def fromConfig(config: Config): ClusterRouterPoolSettings = ClusterRouterPoolSettings( totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config), maxInstancesPerNode = config.getInt("cluster.max-nr-of-instances-per-node"), allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"), - useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role"))) + useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role"))) } /** @@ -85,16 +133,45 @@ final case class ClusterRouterPoolSettings( totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, - useRole: Option[String]) extends ClusterRouterSettingsBase { + useRoles: Set[String]) extends ClusterRouterSettingsBase { + + // For binary compatibility + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") + def useRole: Option[String] = useRoles.headOption + + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") + def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]) = + this(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet) /** * Java API */ + @deprecated("useRole has been replaced with useRoles", since = "2.5.4") def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) = - this(totalInstances, maxInstancesPerNode, allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole)) + this(totalInstances, maxInstancesPerNode, allowLocalRoutees, Option(useRole).toSet) + + /** + * Java API + */ + def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRoles: java.util.Set[String]) = + this(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.asScala.toSet) + + // For binary compatibility + @deprecated("Use copy with useRoles instead", since = "2.5.4") + def copy(totalInstances: Int = totalInstances, maxInstancesPerNode: Int = maxInstancesPerNode, allowLocalRoutees: Boolean = allowLocalRoutees, useRole: Option[String] = useRole): ClusterRouterPoolSettings = + new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole) if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster pool router must be > 0") + def withUseRoles(useRoles: Set[String]): ClusterRouterPoolSettings = new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles) + + @varargs + def withUseRoles(useRoles: String*): ClusterRouterPoolSettings = new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet) + + /** + * Java API + */ + def withUseRoles(useRoles: java.util.Set[String]): ClusterRouterPoolSettings = new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.asScala.toSet) } /** @@ -125,10 +202,11 @@ private[akka] object ClusterRouterSettingsBase { private[akka] trait ClusterRouterSettingsBase { def totalInstances: Int def allowLocalRoutees: Boolean - def useRole: Option[String] + def useRoles: Set[String] - require(useRole.isEmpty || useRole.get.nonEmpty, "useRole must be either None or non-empty Some wrapped role") require(totalInstances > 0, "totalInstances of cluster router must be > 0") + require(useRoles != null, "useRoles must be non-null") + require(!useRoles.exists(role ⇒ role == null || role.isEmpty), "All roles in useRoles must be non-empty") } /** @@ -141,11 +219,11 @@ private[akka] trait ClusterRouterSettingsBase { final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase { override def paths(system: ActorSystem): immutable.Iterable[String] = - if (settings.allowLocalRoutees && settings.useRole.isDefined) { - if (Cluster(system).selfRoles.contains(settings.useRole.get)) { + if (settings.allowLocalRoutees && settings.useRoles.nonEmpty) { + if (settings.useRoles.subsetOf(Cluster(system).selfRoles)) { settings.routeesPaths } else Nil - } else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { + } else if (settings.allowLocalRoutees && settings.useRoles.isEmpty) { settings.routeesPaths } else Nil @@ -157,8 +235,8 @@ final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSe override def withFallback(other: RouterConfig): RouterConfig = other match { case ClusterRouterGroup(_: ClusterRouterGroup, _) ⇒ throw new IllegalStateException( "ClusterRouterGroup is not allowed to wrap a ClusterRouterGroup") - case ClusterRouterGroup(local, _) ⇒ - copy(local = this.local.withFallback(local).asInstanceOf[Group]) + case ClusterRouterGroup(otherLocal, _) ⇒ + copy(local = this.local.withFallback(otherLocal).asInstanceOf[Group]) case _ ⇒ copy(local = this.local.withFallback(other).asInstanceOf[Group]) } @@ -192,11 +270,11 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti * Initial number of routee instances */ override def nrOfInstances(sys: ActorSystem): Int = - if (settings.allowLocalRoutees && settings.useRole.isDefined) { - if (Cluster(sys).selfRoles.contains(settings.useRole.get)) { + if (settings.allowLocalRoutees && settings.useRoles.nonEmpty) { + if (settings.useRoles.subsetOf(Cluster(sys).selfRoles)) { settings.maxInstancesPerNode } else 0 - } else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { + } else if (settings.allowLocalRoutees && settings.useRoles.isEmpty) { settings.maxInstancesPerNode } else 0 @@ -234,7 +312,7 @@ private[akka] trait ClusterRouterConfigBase extends RouterConfig { // Intercept ClusterDomainEvent and route them to the ClusterRouterActor override def isManagementMessage(msg: Any): Boolean = - (msg.isInstanceOf[ClusterDomainEvent]) || msg.isInstanceOf[CurrentClusterState] || super.isManagementMessage(msg) + msg.isInstanceOf[ClusterDomainEvent] || msg.isInstanceOf[CurrentClusterState] || super.isManagementMessage(msg) } /** @@ -383,17 +461,14 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ def isAvailable(m: Member): Boolean = (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) && - satisfiesRole(m.roles) && + satisfiesRoles(m.roles) && (settings.allowLocalRoutees || m.address != cluster.selfAddress) - private def satisfiesRole(memberRoles: Set[String]): Boolean = settings.useRole match { - case None ⇒ true - case Some(r) ⇒ memberRoles.contains(r) - } + private def satisfiesRoles(memberRoles: Set[String]): Boolean = settings.useRoles.subsetOf(memberRoles) def availableNodes: immutable.SortedSet[Address] = { import akka.cluster.Member.addressOrdering - if (nodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles)) + if (nodes.isEmpty && settings.allowLocalRoutees && satisfiesRoles(cluster.selfRoles)) // use my own node, cluster information not updated yet immutable.SortedSet(cluster.selfAddress) else @@ -404,11 +479,11 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ * Fills in self address for local ActorRef */ def fullAddress(routee: Routee): Address = { - val a = routee match { + val address = routee match { case ActorRefRoutee(ref) ⇒ ref.path.address case ActorSelectionRoutee(sel) ⇒ sel.anchor.path.address } - a match { + address match { case Address(_, _, None, None) ⇒ cluster.selfAddress case a ⇒ a } @@ -457,5 +532,4 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ case ReachableMember(m) ⇒ if (isAvailable(m)) addMember(m) } -} - +} \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingGroupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingGroupSpec.scala index f17bd01ce7..f919644b0a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingGroupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingGroupSpec.scala @@ -76,7 +76,7 @@ abstract class ClusterConsistentHashingGroupSpec extends MultiNodeSpec(ClusterCo val router = system.actorOf( ClusterRouterGroup( local = ConsistentHashingGroup(paths, hashMapping = hashMapping), - settings = ClusterRouterGroupSettings(totalInstances = 10, paths, allowLocalRoutees = true, useRole = None)).props(), + settings = ClusterRouterGroupSettings(totalInstances = 10, paths, allowLocalRoutees = true)).props(), "router") // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router).size should ===(3) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala index 7237b1c94a..de985cd6c3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -124,7 +124,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC val router2 = system.actorOf( ClusterRouterPool( local = ConsistentHashingPool(nrOfInstances = 0), - settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = None)). + settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 2, allowLocalRoutees = true)). props(Props[Echo]), "router2") // it may take some time until router receives cluster member events @@ -159,7 +159,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC val router4 = system.actorOf( ClusterRouterPool( local = ConsistentHashingPool(nrOfInstances = 0, hashMapping = hashMapping), - settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). + settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true)). props(Props[Echo]), "router4") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala index c867015686..9e40cc866a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala @@ -85,7 +85,7 @@ object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig { router = round-robin-pool cluster { enabled = on - use-role = a + use-roles = ["a"] max-total-nr-of-instances = 10 } } @@ -115,7 +115,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult lazy val router2 = system.actorOf( ClusterRouterPool( RoundRobinPool(nrOfInstances = 0), - ClusterRouterPoolSettings(totalInstances = 3, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). + ClusterRouterPoolSettings(totalInstances = 3, maxInstancesPerNode = 1, allowLocalRoutees = true)). props(Props[SomeActor]), "router2") lazy val router3 = system.actorOf(FromConfig.props(Props[SomeActor]), "router3") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala index 0816429683..ecee9c051b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala @@ -99,12 +99,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { - val role = Some("b") + val roles = Set("b") val router = system.actorOf( ClusterRouterPool( RoundRobinPool(nrOfInstances = 6), - ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = false, useRole = role)). + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = false, useRoles = roles)). props(Props[SomeActor]), "router-2") @@ -129,13 +129,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { - val role = Some("b") + val roles = Set("b") val router = system.actorOf( ClusterRouterGroup( RoundRobinGroup(paths = Nil), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), - allowLocalRoutees = false, useRole = role)).props, + allowLocalRoutees = false, useRoles = roles)).props, "router-2b") awaitAssert(currentRoutees(router).size should ===(4)) @@ -159,12 +159,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { - val role = Some("b") + val roles = Set("b") val router = system.actorOf( ClusterRouterPool( RoundRobinPool(nrOfInstances = 6), - ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRoles = roles)). props(Props[SomeActor]), "router-3") @@ -189,13 +189,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { - val role = Some("b") + val roles = Set("b") val router = system.actorOf( ClusterRouterGroup( RoundRobinGroup(paths = Nil), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), - allowLocalRoutees = true, useRole = role)).props, + allowLocalRoutees = true, useRoles = roles)).props, "router-3b") awaitAssert(currentRoutees(router).size should ===(4)) @@ -219,12 +219,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { runOn(first) { - val role = Some("a") + val roles = Set("a") val router = system.actorOf( ClusterRouterPool( RoundRobinPool(nrOfInstances = 6), - ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRoles = roles)). props(Props[SomeActor]), "router-4") @@ -249,13 +249,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { runOn(first) { - val role = Some("a") + val roles = Set("a") val router = system.actorOf( ClusterRouterGroup( RoundRobinGroup(paths = Nil), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), - allowLocalRoutees = true, useRole = role)).props, + allowLocalRoutees = true, useRoles = roles)).props, "router-4b") awaitAssert(currentRoutees(router).size should ===(2)) @@ -279,12 +279,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { runOn(first) { - val role = Some("c") + val roles = Set("c") val router = system.actorOf( ClusterRouterPool( RoundRobinPool(nrOfInstances = 6), - ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRoles = roles)). props(Props[SomeActor]), "router-5") @@ -309,13 +309,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp "group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { runOn(first) { - val role = Some("c") + val roles = Set("c") val router = system.actorOf( ClusterRouterGroup( RoundRobinGroup(paths = Nil), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), - allowLocalRoutees = true, useRole = role)).props, + allowLocalRoutees = true, useRoles = roles)).props, "router-5b") awaitAssert(currentRoutees(router).size should ===(6)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 60db31e27d..b03376e559 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -57,7 +57,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { service, deployment.get.config, ClusterRouterPool(RoundRobinPool(20), ClusterRouterPoolSettings( - totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)), + totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false)), ClusterScope, Deploy.NoDispatcherGiven, Deploy.NoMailboxGiven))) @@ -73,7 +73,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { service, deployment.get.config, ClusterRouterGroup(RoundRobinGroup(List("/user/myservice")), ClusterRouterGroupSettings( - totalInstances = 20, routeesPaths = List("/user/myservice"), allowLocalRoutees = false, useRole = None)), + totalInstances = 20, routeesPaths = List("/user/myservice"), allowLocalRoutees = false)), ClusterScope, "mydispatcher", "mymailbox"))) diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index 4bc6c998f0..844b36c7de 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -4,12 +4,12 @@ package akka.cluster.protobuf import akka.cluster._ -import akka.actor.{ Address, ExtendedActorSystem } +import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } -import akka.routing.{ DefaultOptimalSizeExploringResizer, RoundRobinPool } +import akka.routing.RoundRobinPool import collection.immutable.SortedSet -import akka.testkit.AkkaSpec +import akka.testkit.{ AkkaSpec, TestKit } class ClusterMessageSerializerSpec extends AkkaSpec( "akka.actor.provider = cluster") { @@ -75,9 +75,57 @@ class ClusterMessageSerializerSpec extends AkkaSpec( checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) } + + "be compatible with wire format of version 2.5.3 (using use-role instead of use-roles)" in { + val system = ActorSystem("ClusterMessageSerializer-old-wire-format") + + try { + val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) + + // the oldSnapshot was created with the version of ClusterRouterPoolSettings in Akka 2.5.3. See issue #23257. + // It was created with: + /* + import org.apache.commons.codec.binary.Hex.encodeHex + val bytes = serializer.toBinary( + ClusterRouterPool(RoundRobinPool(nrOfInstances = 4), ClusterRouterPoolSettings(123, 345, true, Some("role ABC")))) + println(String.valueOf(encodeHex(bytes))) + */ + + val oldBytesHex = "0a0f08101205524f5252501a04080418001211087b10d90218012208726f6c6520414243" + + import org.apache.commons.codec.binary.Hex.decodeHex + val oldBytes = decodeHex(oldBytesHex.toCharArray) + val result = serializer.fromBinary(oldBytes, classOf[ClusterRouterPool]) + + result match { + case pool: ClusterRouterPool ⇒ + pool.settings.totalInstances should ===(123) + pool.settings.maxInstancesPerNode should ===(345) + pool.settings.allowLocalRoutees should ===(true) + pool.settings.useRole should ===(Some("role ABC")) + pool.settings.useRoles should ===(Set("role ABC")) + } + } finally { + TestKit.shutdownActorSystem(system) + } + + } } "Cluster router pool" must { - "be serializable" in { + "be serializable with no role" in { + checkSerialization(ClusterRouterPool( + RoundRobinPool( + nrOfInstances = 4 + ), + ClusterRouterPoolSettings( + totalInstances = 2, + maxInstancesPerNode = 5, + allowLocalRoutees = true + ) + )) + } + + "be serializable with one role" in { checkSerialization(ClusterRouterPool( RoundRobinPool( nrOfInstances = 4 @@ -86,7 +134,21 @@ class ClusterMessageSerializerSpec extends AkkaSpec( totalInstances = 2, maxInstancesPerNode = 5, allowLocalRoutees = true, - useRole = Some("Richard, Duke of Gloucester") + useRoles = Set("Richard, Duke of Gloucester") + ) + )) + } + + "be serializable with many roles" in { + checkSerialization(ClusterRouterPool( + RoundRobinPool( + nrOfInstances = 4 + ), + ClusterRouterPoolSettings( + totalInstances = 2, + maxInstancesPerNode = 5, + allowLocalRoutees = true, + useRoles = Set("Richard, Duke of Gloucester", "Hongzhi Emperor", "Red Rackham") ) )) } diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala index 6f36ed573f..75b6f6d24f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala @@ -41,8 +41,7 @@ class ClusterRouterSupervisorSpec extends AkkaSpec(""" }), ClusterRouterPoolSettings( totalInstances = 1, maxInstancesPerNode = 1, - allowLocalRoutees = true, - useRole = None)). + allowLocalRoutees = true)). props(Props(classOf[KillableActor], testActor)), name = "therouter") router ! "go away" diff --git a/akka-docs/src/main/paradox/scala/cluster-metrics.md b/akka-docs/src/main/paradox/scala/cluster-metrics.md index 45ea7cb6d1..9781d7d188 100644 --- a/akka-docs/src/main/paradox/scala/cluster-metrics.md +++ b/akka-docs/src/main/paradox/scala/cluster-metrics.md @@ -185,7 +185,7 @@ akka.actor.deployment { routees.paths = ["/user/factorialBackend"] cluster { enabled = on - use-role = backend + use-roles = ["backend"] allow-local-routees = off } } diff --git a/akka-docs/src/main/paradox/scala/cluster-usage.md b/akka-docs/src/main/paradox/scala/cluster-usage.md index 3fca5a411d..d9467b3540 100644 --- a/akka-docs/src/main/paradox/scala/cluster-usage.md +++ b/akka-docs/src/main/paradox/scala/cluster-usage.md @@ -606,7 +606,7 @@ akka.actor.deployment { cluster { enabled = on allow-local-routees = on - use-role = compute + use-roles = ["compute"] } } } @@ -622,7 +622,7 @@ the router will try to use them as soon as the member status is changed to 'Up'. The actor paths without address information that are defined in `routees.paths` are used for selecting the actors to which the messages will be forwarded to by the router. Messages will be forwarded to the routees using @ref:[ActorSelection](actors.md#actorselection), so the same delivery semantics should be expected. -It is possible to limit the lookup of routees to member nodes tagged with a certain role by specifying `use-role`. +It is possible to limit the lookup of routees to member nodes tagged with a particular set of roles by specifying `use-roles`. `max-total-nr-of-instances` defines total number of routees in the cluster. By default `max-total-nr-of-instances` is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster. @@ -693,7 +693,7 @@ akka.actor.deployment { cluster { enabled = on allow-local-routees = on - use-role = compute + use-roles = ["compute"] } } } @@ -722,14 +722,14 @@ akka.actor.deployment { enabled = on max-nr-of-instances-per-node = 3 allow-local-routees = on - use-role = compute + use-roles = ["compute"] } } } ``` -It is possible to limit the deployment of routees to member nodes tagged with a certain role by -specifying `use-role`. +It is possible to limit the deployment of routees to member nodes tagged with a particular set of roles by +specifying `use-roles`. `max-total-nr-of-instances` defines total number of routees in the cluster, but the number of routees per node, `max-nr-of-instances-per-node`, will not be exceeded. By default `max-total-nr-of-instances` @@ -797,7 +797,7 @@ akka.actor.deployment { enabled = on max-nr-of-instances-per-node = 3 allow-local-routees = on - use-role = compute + use-roles = ["compute"] } } } diff --git a/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md b/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md index 82edd196d5..7ed62784f8 100644 --- a/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md +++ b/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md @@ -420,6 +420,14 @@ and here is a summary of things to consider. * [mig25_weaklyup](#mig25-weaklyup) * [mig25_sharding_store](#mig25-sharding-store) * [mig25_mutual](#mig25-mutual) + +#### Limit lookup of routees to nodes tagged with multiple roles + +Starting with 2.5.4, cluster routing supports delivering messages to routees tagged with all specified roles +using `use-roles` (instead of `use-role` in previous versions). When doing rolling upgrades and using this new feature, +it is important to first upgrade the existing nodes to the latest version of Akka +and then start using multiple roles in a separate rolling upgrade. Otherwise, if a new node sends a message +with the restriction `use-roles = ["a", "b"]`, that will only require the "a" role on old nodes. ### Coordinated Shutdown diff --git a/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java b/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java index 2d35d43718..34f7d05429 100644 --- a/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java +++ b/akka-docs/src/test/java/jdocs/cluster/FactorialFrontend.java @@ -2,6 +2,8 @@ package jdocs.cluster; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import akka.actor.Props; @@ -77,12 +79,12 @@ abstract class FactorialFrontend2 extends AbstractActor { int totalInstances = 100; Iterable routeesPaths = Arrays.asList("/user/factorialBackend", ""); boolean allowLocalRoutees = true; - String useRole = "backend"; + Set useRoles = new HashSet<>(Arrays.asList("backend")); ActorRef backend = getContext().actorOf( new ClusterRouterGroup(new AdaptiveLoadBalancingGroup( HeapMetricsSelector.getInstance(), Collections. emptyList()), new ClusterRouterGroupSettings(totalInstances, routeesPaths, - allowLocalRoutees, useRole)).props(), "factorialBackendRouter2"); + allowLocalRoutees, useRoles)).props(), "factorialBackendRouter2"); //#router-lookup-in-code } @@ -93,12 +95,12 @@ abstract class FactorialFrontend3 extends AbstractActor { int totalInstances = 100; int maxInstancesPerNode = 3; boolean allowLocalRoutees = false; - String useRole = "backend"; + Set useRoles = new HashSet<>(Arrays.asList("backend")); ActorRef backend = getContext().actorOf( new ClusterRouterPool(new AdaptiveLoadBalancingPool( SystemLoadAverageMetricsSelector.getInstance(), 0), new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, - allowLocalRoutees, useRole)).props(Props + allowLocalRoutees, useRoles)).props(Props .create(FactorialBackend.class)), "factorialBackendRouter3"); //#router-deploy-in-code } diff --git a/akka-docs/src/test/java/jdocs/cluster/StatsService.java b/akka-docs/src/test/java/jdocs/cluster/StatsService.java index 61509307c1..851522b170 100644 --- a/akka-docs/src/test/java/jdocs/cluster/StatsService.java +++ b/akka-docs/src/test/java/jdocs/cluster/StatsService.java @@ -13,7 +13,10 @@ import akka.actor.AbstractActor; import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; import akka.routing.FromConfig; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; //#service public class StatsService extends AbstractActor { @@ -55,11 +58,11 @@ abstract class StatsService2 extends AbstractActor { Iterable routeesPaths = Collections .singletonList("/user/statsWorker"); boolean allowLocalRoutees = true; - String useRole = "compute"; + Set useRoles = new HashSet<>(Arrays.asList("compute")); ActorRef workerRouter = getContext().actorOf( new ClusterRouterGroup(new ConsistentHashingGroup(routeesPaths), new ClusterRouterGroupSettings(totalInstances, routeesPaths, - allowLocalRoutees, useRole)).props(), "workerRouter2"); + allowLocalRoutees, useRoles)).props(), "workerRouter2"); //#router-lookup-in-code } @@ -69,11 +72,11 @@ abstract class StatsService3 extends AbstractActor { int totalInstances = 100; int maxInstancesPerNode = 3; boolean allowLocalRoutees = false; - String useRole = "compute"; + Set useRoles = new HashSet<>(Arrays.asList("compute")); ActorRef workerRouter = getContext().actorOf( new ClusterRouterPool(new ConsistentHashingPool(0), new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, - allowLocalRoutees, useRole)).props(Props + allowLocalRoutees, useRoles)).props(Props .create(StatsWorker.class)), "workerRouter3"); //#router-deploy-in-code } diff --git a/akka-docs/src/test/scala/docs/cluster/FactorialFrontend.scala b/akka-docs/src/test/scala/docs/cluster/FactorialFrontend.scala index 897c331e65..aa1db19db6 100644 --- a/akka-docs/src/test/scala/docs/cluster/FactorialFrontend.scala +++ b/akka-docs/src/test/scala/docs/cluster/FactorialFrontend.scala @@ -78,7 +78,7 @@ abstract class FactorialFrontend2 extends Actor { AdaptiveLoadBalancingGroup(HeapMetricsSelector), ClusterRouterGroupSettings( totalInstances = 100, routeesPaths = List("/user/factorialBackend"), - allowLocalRoutees = true, useRole = Some("backend"))).props(), + allowLocalRoutees = true, useRoles = Set("backend"))).props(), name = "factorialBackendRouter2") //#router-lookup-in-code @@ -96,7 +96,7 @@ abstract class FactorialFrontend3 extends Actor { ClusterRouterPool(AdaptiveLoadBalancingPool( SystemLoadAverageMetricsSelector), ClusterRouterPoolSettings( totalInstances = 100, maxInstancesPerNode = 3, - allowLocalRoutees = false, useRole = Some("backend"))).props(Props[FactorialBackend]), + allowLocalRoutees = false, useRoles = Set("backend"))).props(Props[FactorialBackend]), name = "factorialBackendRouter3") //#router-deploy-in-code }