Replace ClusterRouterGroup/Pool "use-role" with "use-role-set" #23496

This commit is contained in:
Sébastien Lorion 2017-08-09 16:06:18 +02:00 committed by Johan Andrén
parent 5477a6f92d
commit a95a94acff
22 changed files with 445 additions and 96 deletions

View file

@ -138,7 +138,7 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
val router = system.actorOf( val router = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
local = AdaptiveLoadBalancingPool(HeapMetricsSelector), local = AdaptiveLoadBalancingPool(HeapMetricsSelector),
settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true)).
props(Props[Echo]), props(Props[Echo]),
name) name)
// it may take some time until router receives cluster member events // it may take some time until router receives cluster member events

View file

@ -45,7 +45,7 @@ object StatsSampleSpecConfig extends MultiNodeConfig {
cluster { cluster {
enabled = on enabled = on
allow-local-routees = on allow-local-routees = on
use-role = compute use-roles = ["compute"]
} }
} }
} }

View file

@ -57,7 +57,7 @@ abstract class StatsService2 extends Actor {
val workerRouter = context.actorOf( val workerRouter = context.actorOf(
ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings( ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings(
totalInstances = 100, routeesPaths = List("/user/statsWorker"), totalInstances = 100, routeesPaths = List("/user/statsWorker"),
allowLocalRoutees = true, useRole = Some("compute"))).props(), allowLocalRoutees = true, useRoles = Set("compute"))).props(),
name = "workerRouter2") name = "workerRouter2")
//#router-lookup-in-code //#router-lookup-in-code
} }
@ -71,7 +71,7 @@ abstract class StatsService3 extends Actor {
val workerRouter = context.actorOf( val workerRouter = context.actorOf(
ClusterRouterPool(ConsistentHashingPool(0), ClusterRouterPoolSettings( ClusterRouterPool(ConsistentHashingPool(0), ClusterRouterPoolSettings(
totalInstances = 100, maxInstancesPerNode = 3, totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false, useRole = None)).props(Props[StatsWorker]), allowLocalRoutees = false)).props(Props[StatsWorker]),
name = "workerRouter3") name = "workerRouter3")
//#router-deploy-in-code //#router-deploy-in-code
} }

View file

@ -13149,6 +13149,26 @@ public final class ClusterMessages {
*/ */
akka.protobuf.ByteString akka.protobuf.ByteString
getUseRoleBytes(); getUseRoleBytes();
// repeated string useRoles = 5;
/**
* <code>repeated string useRoles = 5;</code>
*/
java.util.List<java.lang.String>
getUseRolesList();
/**
* <code>repeated string useRoles = 5;</code>
*/
int getUseRolesCount();
/**
* <code>repeated string useRoles = 5;</code>
*/
java.lang.String getUseRoles(int index);
/**
* <code>repeated string useRoles = 5;</code>
*/
akka.protobuf.ByteString
getUseRolesBytes(int index);
} }
/** /**
* Protobuf type {@code ClusterRouterPoolSettings} * Protobuf type {@code ClusterRouterPoolSettings}
@ -13221,6 +13241,14 @@ public final class ClusterMessages {
useRole_ = input.readBytes(); useRole_ = input.readBytes();
break; break;
} }
case 42: {
if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
useRoles_ = new akka.protobuf.LazyStringArrayList();
mutable_bitField0_ |= 0x00000010;
}
useRoles_.add(input.readBytes());
break;
}
} }
} }
} catch (akka.protobuf.InvalidProtocolBufferException e) { } catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -13229,6 +13257,9 @@ public final class ClusterMessages {
throw new akka.protobuf.InvalidProtocolBufferException( throw new akka.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this); e.getMessage()).setUnfinishedMessage(this);
} finally { } finally {
if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
useRoles_ = new akka.protobuf.UnmodifiableLazyStringList(useRoles_);
}
this.unknownFields = unknownFields.build(); this.unknownFields = unknownFields.build();
makeExtensionsImmutable(); makeExtensionsImmutable();
} }
@ -13352,11 +13383,42 @@ public final class ClusterMessages {
} }
} }
// repeated string useRoles = 5;
public static final int USEROLES_FIELD_NUMBER = 5;
private akka.protobuf.LazyStringList useRoles_;
/**
* <code>repeated string useRoles = 5;</code>
*/
public java.util.List<java.lang.String>
getUseRolesList() {
return useRoles_;
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public int getUseRolesCount() {
return useRoles_.size();
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public java.lang.String getUseRoles(int index) {
return useRoles_.get(index);
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public akka.protobuf.ByteString
getUseRolesBytes(int index) {
return useRoles_.getByteString(index);
}
private void initFields() { private void initFields() {
totalInstances_ = 0; totalInstances_ = 0;
maxInstancesPerNode_ = 0; maxInstancesPerNode_ = 0;
allowLocalRoutees_ = false; allowLocalRoutees_ = false;
useRole_ = ""; useRole_ = "";
useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -13394,6 +13456,9 @@ public final class ClusterMessages {
if (((bitField0_ & 0x00000008) == 0x00000008)) { if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(4, getUseRoleBytes()); output.writeBytes(4, getUseRoleBytes());
} }
for (int i = 0; i < useRoles_.size(); i++) {
output.writeBytes(5, useRoles_.getByteString(i));
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -13419,6 +13484,15 @@ public final class ClusterMessages {
size += akka.protobuf.CodedOutputStream size += akka.protobuf.CodedOutputStream
.computeBytesSize(4, getUseRoleBytes()); .computeBytesSize(4, getUseRoleBytes());
} }
{
int dataSize = 0;
for (int i = 0; i < useRoles_.size(); i++) {
dataSize += akka.protobuf.CodedOutputStream
.computeBytesSizeNoTag(useRoles_.getByteString(i));
}
size += dataSize;
size += 1 * getUseRolesList().size();
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -13543,6 +13617,8 @@ public final class ClusterMessages {
bitField0_ = (bitField0_ & ~0x00000004); bitField0_ = (bitField0_ & ~0x00000004);
useRole_ = ""; useRole_ = "";
bitField0_ = (bitField0_ & ~0x00000008); bitField0_ = (bitField0_ & ~0x00000008);
useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000010);
return this; return this;
} }
@ -13587,6 +13663,12 @@ public final class ClusterMessages {
to_bitField0_ |= 0x00000008; to_bitField0_ |= 0x00000008;
} }
result.useRole_ = useRole_; result.useRole_ = useRole_;
if (((bitField0_ & 0x00000010) == 0x00000010)) {
useRoles_ = new akka.protobuf.UnmodifiableLazyStringList(
useRoles_);
bitField0_ = (bitField0_ & ~0x00000010);
}
result.useRoles_ = useRoles_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -13617,6 +13699,16 @@ public final class ClusterMessages {
useRole_ = other.useRole_; useRole_ = other.useRole_;
onChanged(); onChanged();
} }
if (!other.useRoles_.isEmpty()) {
if (useRoles_.isEmpty()) {
useRoles_ = other.useRoles_;
bitField0_ = (bitField0_ & ~0x00000010);
} else {
ensureUseRolesIsMutable();
useRoles_.addAll(other.useRoles_);
}
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -13829,6 +13921,99 @@ public final class ClusterMessages {
return this; return this;
} }
// repeated string useRoles = 5;
private akka.protobuf.LazyStringList useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY;
private void ensureUseRolesIsMutable() {
if (!((bitField0_ & 0x00000010) == 0x00000010)) {
useRoles_ = new akka.protobuf.LazyStringArrayList(useRoles_);
bitField0_ |= 0x00000010;
}
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public java.util.List<java.lang.String>
getUseRolesList() {
return java.util.Collections.unmodifiableList(useRoles_);
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public int getUseRolesCount() {
return useRoles_.size();
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public java.lang.String getUseRoles(int index) {
return useRoles_.get(index);
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public akka.protobuf.ByteString
getUseRolesBytes(int index) {
return useRoles_.getByteString(index);
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public Builder setUseRoles(
int index, java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureUseRolesIsMutable();
useRoles_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public Builder addUseRoles(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureUseRolesIsMutable();
useRoles_.add(value);
onChanged();
return this;
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public Builder addAllUseRoles(
java.lang.Iterable<java.lang.String> values) {
ensureUseRolesIsMutable();
super.addAll(values, useRoles_);
onChanged();
return this;
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public Builder clearUseRoles() {
useRoles_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000010);
onChanged();
return this;
}
/**
* <code>repeated string useRoles = 5;</code>
*/
public Builder addUseRolesBytes(
akka.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureUseRolesIsMutable();
useRoles_.add(value);
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ClusterRouterPoolSettings) // @@protoc_insertion_point(builder_scope:ClusterRouterPoolSettings)
} }
@ -13967,15 +14152,16 @@ public final class ClusterMessages {
"\001(\r\"V\n\021ClusterRouterPool\022\023\n\004pool\030\001 \002(\0132\005" + "\001(\r\"V\n\021ClusterRouterPool\022\023\n\004pool\030\001 \002(\0132\005" +
".Pool\022,\n\010settings\030\002 \002(\0132\032.ClusterRouterP" + ".Pool\022,\n\010settings\030\002 \002(\0132\032.ClusterRouterP" +
"oolSettings\"<\n\004Pool\022\024\n\014serializerId\030\001 \002(" + "oolSettings\"<\n\004Pool\022\024\n\014serializerId\030\001 \002(" +
"\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"|\n\031Clu" + "\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"\216\001\n\031Cl" +
"sterRouterPoolSettings\022\026\n\016totalInstances" + "usterRouterPoolSettings\022\026\n\016totalInstance" +
"\030\001 \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(\r\022\031\n\021a" + "s\030\001 \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(\r\022\031\n\021" +
"llowLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030\004 \001(\t*" + "allowLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030\004 \001(\t" +
"D\n\022ReachabilityStatus\022\r\n\tReachable\020\000\022\017\n\013" + "\022\020\n\010useRoles\030\005 \003(\t*D\n\022ReachabilityStatus" +
"Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014MemberS" + "\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022\016\n\nTerm" +
"tatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022", "inated\020\002*b\n\014MemberStatus\022\013\n\007Joining\020\000\022\006\n",
"\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010W" + "\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020" +
"eaklyUp\020\006B\035\n\031akka.cluster.protobuf.msgH\001" "\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031akka.clu" +
"ster.protobuf.msgH\001"
}; };
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -14083,7 +14269,7 @@ public final class ClusterMessages {
internal_static_ClusterRouterPoolSettings_fieldAccessorTable = new internal_static_ClusterRouterPoolSettings_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable( akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ClusterRouterPoolSettings_descriptor, internal_static_ClusterRouterPoolSettings_descriptor,
new java.lang.String[] { "TotalInstances", "MaxInstancesPerNode", "AllowLocalRoutees", "UseRole", }); new java.lang.String[] { "TotalInstances", "MaxInstancesPerNode", "AllowLocalRoutees", "UseRole", "UseRoles", });
return null; return null;
} }
}; };

View file

@ -0,0 +1,7 @@
# #23257 replace ClusterRouterGroup/Pool "use-role" with "use-roles"
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRoles")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRolesBytes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRolesCount")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#ClusterRouterPoolSettingsOrBuilder.getUseRolesList")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.routing.ClusterRouterSettingsBase.useRole")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.routing.ClusterRouterSettingsBase.useRoles")

View file

@ -223,4 +223,5 @@ message UniqueAddress {
required uint32 maxInstancesPerNode = 2; required uint32 maxInstancesPerNode = 2;
required bool allowLocalRoutees = 3; required bool allowLocalRoutees = 3;
optional string useRole = 4; optional string useRole = 4;
repeated string useRoles = 5;
} }

View file

@ -234,9 +234,12 @@ akka {
# Useful for master-worker scenario where all routees are remote. # Useful for master-worker scenario where all routees are remote.
allow-local-routees = on allow-local-routees = on
# Use members with all specified roles, or all members if undefined or empty.
use-roles = []
# Deprecated, since Akka 2.5.4, replaced by use-roles
# Use members with specified role, or all members if undefined or empty. # Use members with specified role, or all members if undefined or empty.
use-role = "" use-role = ""
} }
# Protobuf serializer for cluster messages # Protobuf serializer for cluster messages

View file

@ -13,8 +13,8 @@ import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWi
import akka.protobuf.{ ByteString, MessageLite } import akka.protobuf.{ ByteString, MessageLite }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.immutable import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.concurrent.duration.Deadline import scala.concurrent.duration.Deadline
import java.io.NotSerializableException import java.io.NotSerializableException
@ -166,8 +166,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
builder.setAllowLocalRoutees(settings.allowLocalRoutees) builder.setAllowLocalRoutees(settings.allowLocalRoutees)
.setMaxInstancesPerNode(settings.maxInstancesPerNode) .setMaxInstancesPerNode(settings.maxInstancesPerNode)
.setTotalInstances(settings.totalInstances) .setTotalInstances(settings.totalInstances)
.addAllUseRoles(settings.useRoles.asJava)
// for backwards compatibility
settings.useRole.foreach(builder.setUseRole) settings.useRole.foreach(builder.setUseRole)
builder.build() builder.build()
} }
@ -378,11 +381,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
} }
private def clusterRouterPoolSettingsFromProto(crps: cm.ClusterRouterPoolSettings): ClusterRouterPoolSettings = { private def clusterRouterPoolSettingsFromProto(crps: cm.ClusterRouterPoolSettings): ClusterRouterPoolSettings = {
// For backwards compatibility, useRoles is the combination of getUseRole and getUseRolesList
ClusterRouterPoolSettings( ClusterRouterPoolSettings(
totalInstances = crps.getTotalInstances, totalInstances = crps.getTotalInstances,
maxInstancesPerNode = crps.getMaxInstancesPerNode, maxInstancesPerNode = crps.getMaxInstancesPerNode,
allowLocalRoutees = crps.getAllowLocalRoutees, allowLocalRoutees = crps.getAllowLocalRoutees,
useRole = if (crps.hasUseRole) Some(crps.getUseRole) else None useRoles = if (crps.hasUseRole) { crps.getUseRolesList.asScala.toSet + crps.getUseRole } else { crps.getUseRolesList.asScala.toSet }
) )
} }

View file

@ -26,16 +26,26 @@ import akka.routing.RoutingLogic
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.annotation.tailrec import scala.annotation.{ tailrec, varargs }
import scala.collection.immutable import scala.collection.immutable
import scala.collection.JavaConverters._
object ClusterRouterGroupSettings { object ClusterRouterGroupSettings {
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def apply(totalInstances: Int, routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterGroupSettings =
ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet)
@varargs
def apply(totalInstances: Int, routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRoles: String*): ClusterRouterGroupSettings =
ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet)
// For backwards compatibility, useRoles is the combination of use-roles and use-role
def fromConfig(config: Config): ClusterRouterGroupSettings = def fromConfig(config: Config): ClusterRouterGroupSettings =
ClusterRouterGroupSettings( ClusterRouterGroupSettings(
totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config), totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config),
routeesPaths = immutableSeq(config.getStringList("routees.paths")), routeesPaths = immutableSeq(config.getStringList("routees.paths")),
allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"), allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"),
useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role"))) useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role")))
} }
/** /**
@ -46,33 +56,71 @@ final case class ClusterRouterGroupSettings(
totalInstances: Int, totalInstances: Int,
routeesPaths: immutable.Seq[String], routeesPaths: immutable.Seq[String],
allowLocalRoutees: Boolean, allowLocalRoutees: Boolean,
useRole: Option[String]) extends ClusterRouterSettingsBase { useRoles: Set[String]) extends ClusterRouterSettingsBase {
// For binary compatibility
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def useRole: Option[String] = useRoles.headOption
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def this(totalInstances: Int, routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRole: Option[String]) =
this(totalInstances, routeesPaths, allowLocalRoutees, useRole.toSet)
/** /**
* Java API * Java API
*/ */
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRole: String) = def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole)) this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, Option(useRole).toSet)
/**
* Java API
*/
def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRoles: java.util.Set[String]) =
this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, useRoles.asScala.toSet)
// For binary compatibility
@deprecated("Use constructor with useRoles instead", since = "2.5.4")
def copy(totalInstances: Int = totalInstances, routeesPaths: immutable.Seq[String] = routeesPaths, allowLocalRoutees: Boolean = allowLocalRoutees, useRole: Option[String] = useRole): ClusterRouterGroupSettings =
new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRole)
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0") if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
if ((routeesPaths eq null) || routeesPaths.isEmpty || routeesPaths.head == "") if ((routeesPaths eq null) || routeesPaths.isEmpty || routeesPaths.head == "")
throw new IllegalArgumentException("routeesPaths must be defined") throw new IllegalArgumentException("routeesPaths must be defined")
routeesPaths.foreach(p p match { routeesPaths.foreach {
case RelativeActorPath(elements) // good case RelativeActorPath(elements) // good
case _ case p
throw new IllegalArgumentException(s"routeesPaths [$p] is not a valid actor path without address information") throw new IllegalArgumentException(s"routeesPaths [$p] is not a valid actor path without address information")
}) }
def withUseRoles(useRoles: Set[String]): ClusterRouterGroupSettings = new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles)
@varargs
def withUseRoles(useRoles: String*): ClusterRouterGroupSettings = new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.toSet)
/**
* Java API
*/
def withUseRoles(useRoles: java.util.Set[String]): ClusterRouterGroupSettings = new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles.asScala.toSet)
} }
object ClusterRouterPoolSettings { object ClusterRouterPoolSettings {
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterPoolSettings =
ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet)
@varargs
def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRoles: String*): ClusterRouterPoolSettings =
ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet)
// For backwards compatibility, useRoles is the combination of use-roles and use-role
def fromConfig(config: Config): ClusterRouterPoolSettings = def fromConfig(config: Config): ClusterRouterPoolSettings =
ClusterRouterPoolSettings( ClusterRouterPoolSettings(
totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config), totalInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config),
maxInstancesPerNode = config.getInt("cluster.max-nr-of-instances-per-node"), maxInstancesPerNode = config.getInt("cluster.max-nr-of-instances-per-node"),
allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"), allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"),
useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role"))) useRoles = config.getStringList("cluster.use-roles").asScala.toSet ++ ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role")))
} }
/** /**
@ -85,16 +133,45 @@ final case class ClusterRouterPoolSettings(
totalInstances: Int, totalInstances: Int,
maxInstancesPerNode: Int, maxInstancesPerNode: Int,
allowLocalRoutees: Boolean, allowLocalRoutees: Boolean,
useRole: Option[String]) extends ClusterRouterSettingsBase { useRoles: Set[String]) extends ClusterRouterSettingsBase {
// For binary compatibility
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def useRole: Option[String] = useRoles.headOption
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]) =
this(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole.toSet)
/** /**
* Java API * Java API
*/ */
@deprecated("useRole has been replaced with useRoles", since = "2.5.4")
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) = def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode, allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole)) this(totalInstances, maxInstancesPerNode, allowLocalRoutees, Option(useRole).toSet)
/**
* Java API
*/
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRoles: java.util.Set[String]) =
this(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.asScala.toSet)
// For binary compatibility
@deprecated("Use copy with useRoles instead", since = "2.5.4")
def copy(totalInstances: Int = totalInstances, maxInstancesPerNode: Int = maxInstancesPerNode, allowLocalRoutees: Boolean = allowLocalRoutees, useRole: Option[String] = useRole): ClusterRouterPoolSettings =
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole)
if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster pool router must be > 0") if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster pool router must be > 0")
def withUseRoles(useRoles: Set[String]): ClusterRouterPoolSettings = new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles)
@varargs
def withUseRoles(useRoles: String*): ClusterRouterPoolSettings = new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.toSet)
/**
* Java API
*/
def withUseRoles(useRoles: java.util.Set[String]): ClusterRouterPoolSettings = new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles.asScala.toSet)
} }
/** /**
@ -125,10 +202,11 @@ private[akka] object ClusterRouterSettingsBase {
private[akka] trait ClusterRouterSettingsBase { private[akka] trait ClusterRouterSettingsBase {
def totalInstances: Int def totalInstances: Int
def allowLocalRoutees: Boolean def allowLocalRoutees: Boolean
def useRole: Option[String] def useRoles: Set[String]
require(useRole.isEmpty || useRole.get.nonEmpty, "useRole must be either None or non-empty Some wrapped role")
require(totalInstances > 0, "totalInstances of cluster router must be > 0") require(totalInstances > 0, "totalInstances of cluster router must be > 0")
require(useRoles != null, "useRoles must be non-null")
require(!useRoles.exists(role role == null || role.isEmpty), "All roles in useRoles must be non-empty")
} }
/** /**
@ -141,11 +219,11 @@ private[akka] trait ClusterRouterSettingsBase {
final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase { final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase {
override def paths(system: ActorSystem): immutable.Iterable[String] = override def paths(system: ActorSystem): immutable.Iterable[String] =
if (settings.allowLocalRoutees && settings.useRole.isDefined) { if (settings.allowLocalRoutees && settings.useRoles.nonEmpty) {
if (Cluster(system).selfRoles.contains(settings.useRole.get)) { if (settings.useRoles.subsetOf(Cluster(system).selfRoles)) {
settings.routeesPaths settings.routeesPaths
} else Nil } else Nil
} else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { } else if (settings.allowLocalRoutees && settings.useRoles.isEmpty) {
settings.routeesPaths settings.routeesPaths
} else Nil } else Nil
@ -157,8 +235,8 @@ final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSe
override def withFallback(other: RouterConfig): RouterConfig = other match { override def withFallback(other: RouterConfig): RouterConfig = other match {
case ClusterRouterGroup(_: ClusterRouterGroup, _) throw new IllegalStateException( case ClusterRouterGroup(_: ClusterRouterGroup, _) throw new IllegalStateException(
"ClusterRouterGroup is not allowed to wrap a ClusterRouterGroup") "ClusterRouterGroup is not allowed to wrap a ClusterRouterGroup")
case ClusterRouterGroup(local, _) case ClusterRouterGroup(otherLocal, _)
copy(local = this.local.withFallback(local).asInstanceOf[Group]) copy(local = this.local.withFallback(otherLocal).asInstanceOf[Group])
case _ case _
copy(local = this.local.withFallback(other).asInstanceOf[Group]) copy(local = this.local.withFallback(other).asInstanceOf[Group])
} }
@ -192,11 +270,11 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti
* Initial number of routee instances * Initial number of routee instances
*/ */
override def nrOfInstances(sys: ActorSystem): Int = override def nrOfInstances(sys: ActorSystem): Int =
if (settings.allowLocalRoutees && settings.useRole.isDefined) { if (settings.allowLocalRoutees && settings.useRoles.nonEmpty) {
if (Cluster(sys).selfRoles.contains(settings.useRole.get)) { if (settings.useRoles.subsetOf(Cluster(sys).selfRoles)) {
settings.maxInstancesPerNode settings.maxInstancesPerNode
} else 0 } else 0
} else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { } else if (settings.allowLocalRoutees && settings.useRoles.isEmpty) {
settings.maxInstancesPerNode settings.maxInstancesPerNode
} else 0 } else 0
@ -234,7 +312,7 @@ private[akka] trait ClusterRouterConfigBase extends RouterConfig {
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor // Intercept ClusterDomainEvent and route them to the ClusterRouterActor
override def isManagementMessage(msg: Any): Boolean = override def isManagementMessage(msg: Any): Boolean =
(msg.isInstanceOf[ClusterDomainEvent]) || msg.isInstanceOf[CurrentClusterState] || super.isManagementMessage(msg) msg.isInstanceOf[ClusterDomainEvent] || msg.isInstanceOf[CurrentClusterState] || super.isManagementMessage(msg)
} }
/** /**
@ -383,17 +461,14 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
def isAvailable(m: Member): Boolean = def isAvailable(m: Member): Boolean =
(m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) && (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) &&
satisfiesRole(m.roles) && satisfiesRoles(m.roles) &&
(settings.allowLocalRoutees || m.address != cluster.selfAddress) (settings.allowLocalRoutees || m.address != cluster.selfAddress)
private def satisfiesRole(memberRoles: Set[String]): Boolean = settings.useRole match { private def satisfiesRoles(memberRoles: Set[String]): Boolean = settings.useRoles.subsetOf(memberRoles)
case None true
case Some(r) memberRoles.contains(r)
}
def availableNodes: immutable.SortedSet[Address] = { def availableNodes: immutable.SortedSet[Address] = {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
if (nodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles)) if (nodes.isEmpty && settings.allowLocalRoutees && satisfiesRoles(cluster.selfRoles))
// use my own node, cluster information not updated yet // use my own node, cluster information not updated yet
immutable.SortedSet(cluster.selfAddress) immutable.SortedSet(cluster.selfAddress)
else else
@ -404,11 +479,11 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
* Fills in self address for local ActorRef * Fills in self address for local ActorRef
*/ */
def fullAddress(routee: Routee): Address = { def fullAddress(routee: Routee): Address = {
val a = routee match { val address = routee match {
case ActorRefRoutee(ref) ref.path.address case ActorRefRoutee(ref) ref.path.address
case ActorSelectionRoutee(sel) sel.anchor.path.address case ActorSelectionRoutee(sel) sel.anchor.path.address
} }
a match { address match {
case Address(_, _, None, None) cluster.selfAddress case Address(_, _, None, None) cluster.selfAddress
case a a case a a
} }
@ -458,4 +533,3 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
if (isAvailable(m)) addMember(m) if (isAvailable(m)) addMember(m)
} }
} }

View file

@ -76,7 +76,7 @@ abstract class ClusterConsistentHashingGroupSpec extends MultiNodeSpec(ClusterCo
val router = system.actorOf( val router = system.actorOf(
ClusterRouterGroup( ClusterRouterGroup(
local = ConsistentHashingGroup(paths, hashMapping = hashMapping), local = ConsistentHashingGroup(paths, hashMapping = hashMapping),
settings = ClusterRouterGroupSettings(totalInstances = 10, paths, allowLocalRoutees = true, useRole = None)).props(), settings = ClusterRouterGroupSettings(totalInstances = 10, paths, allowLocalRoutees = true)).props(),
"router") "router")
// it may take some time until router receives cluster member events // it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router).size should ===(3) } awaitAssert { currentRoutees(router).size should ===(3) }

View file

@ -124,7 +124,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
val router2 = system.actorOf( val router2 = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
local = ConsistentHashingPool(nrOfInstances = 0), local = ConsistentHashingPool(nrOfInstances = 0),
settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = None)). settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 2, allowLocalRoutees = true)).
props(Props[Echo]), props(Props[Echo]),
"router2") "router2")
// it may take some time until router receives cluster member events // it may take some time until router receives cluster member events
@ -159,7 +159,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
val router4 = system.actorOf( val router4 = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
local = ConsistentHashingPool(nrOfInstances = 0, hashMapping = hashMapping), local = ConsistentHashingPool(nrOfInstances = 0, hashMapping = hashMapping),
settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true)).
props(Props[Echo]), props(Props[Echo]),
"router4") "router4")

View file

@ -85,7 +85,7 @@ object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig {
router = round-robin-pool router = round-robin-pool
cluster { cluster {
enabled = on enabled = on
use-role = a use-roles = ["a"]
max-total-nr-of-instances = 10 max-total-nr-of-instances = 10
} }
} }
@ -115,7 +115,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult
lazy val router2 = system.actorOf( lazy val router2 = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
RoundRobinPool(nrOfInstances = 0), RoundRobinPool(nrOfInstances = 0),
ClusterRouterPoolSettings(totalInstances = 3, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). ClusterRouterPoolSettings(totalInstances = 3, maxInstancesPerNode = 1, allowLocalRoutees = true)).
props(Props[SomeActor]), props(Props[SomeActor]),
"router2") "router2")
lazy val router3 = system.actorOf(FromConfig.props(Props[SomeActor]), "router3") lazy val router3 = system.actorOf(FromConfig.props(Props[SomeActor]), "router3")

View file

@ -99,12 +99,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { "pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("b") val roles = Set("b")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6), RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = false, useRole = role)). ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = false, useRoles = roles)).
props(Props[SomeActor]), props(Props[SomeActor]),
"router-2") "router-2")
@ -129,13 +129,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { "group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("b") val roles = Set("b")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterGroup( ClusterRouterGroup(
RoundRobinGroup(paths = Nil), RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = false, useRole = role)).props, allowLocalRoutees = false, useRoles = roles)).props,
"router-2b") "router-2b")
awaitAssert(currentRoutees(router).size should ===(4)) awaitAssert(currentRoutees(router).size should ===(4))
@ -159,12 +159,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { "pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("b") val roles = Set("b")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6), RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRoles = roles)).
props(Props[SomeActor]), props(Props[SomeActor]),
"router-3") "router-3")
@ -189,13 +189,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { "group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("b") val roles = Set("b")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterGroup( ClusterRouterGroup(
RoundRobinGroup(paths = Nil), RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = true, useRole = role)).props, allowLocalRoutees = true, useRoles = roles)).props,
"router-3b") "router-3b")
awaitAssert(currentRoutees(router).size should ===(4)) awaitAssert(currentRoutees(router).size should ===(4))
@ -219,12 +219,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { "pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("a") val roles = Set("a")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6), RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRoles = roles)).
props(Props[SomeActor]), props(Props[SomeActor]),
"router-4") "router-4")
@ -249,13 +249,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { "group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("a") val roles = Set("a")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterGroup( ClusterRouterGroup(
RoundRobinGroup(paths = Nil), RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = true, useRole = role)).props, allowLocalRoutees = true, useRoles = roles)).props,
"router-4b") "router-4b")
awaitAssert(currentRoutees(router).size should ===(2)) awaitAssert(currentRoutees(router).size should ===(2))
@ -279,12 +279,12 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { "pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("c") val roles = Set("c")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterPool( ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6), RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRoles = roles)).
props(Props[SomeActor]), props(Props[SomeActor]),
"router-5") "router-5")
@ -309,13 +309,13 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
"group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { "group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val role = Some("c") val roles = Set("c")
val router = system.actorOf( val router = system.actorOf(
ClusterRouterGroup( ClusterRouterGroup(
RoundRobinGroup(paths = Nil), RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = true, useRole = role)).props, allowLocalRoutees = true, useRoles = roles)).props,
"router-5b") "router-5b")
awaitAssert(currentRoutees(router).size should ===(6)) awaitAssert(currentRoutees(router).size should ===(6))

View file

@ -57,7 +57,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
service, service,
deployment.get.config, deployment.get.config,
ClusterRouterPool(RoundRobinPool(20), ClusterRouterPoolSettings( ClusterRouterPool(RoundRobinPool(20), ClusterRouterPoolSettings(
totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)), totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false)),
ClusterScope, ClusterScope,
Deploy.NoDispatcherGiven, Deploy.NoDispatcherGiven,
Deploy.NoMailboxGiven))) Deploy.NoMailboxGiven)))
@ -73,7 +73,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
service, service,
deployment.get.config, deployment.get.config,
ClusterRouterGroup(RoundRobinGroup(List("/user/myservice")), ClusterRouterGroupSettings( ClusterRouterGroup(RoundRobinGroup(List("/user/myservice")), ClusterRouterGroupSettings(
totalInstances = 20, routeesPaths = List("/user/myservice"), allowLocalRoutees = false, useRole = None)), totalInstances = 20, routeesPaths = List("/user/myservice"), allowLocalRoutees = false)),
ClusterScope, ClusterScope,
"mydispatcher", "mydispatcher",
"mymailbox"))) "mymailbox")))

View file

@ -4,12 +4,12 @@
package akka.cluster.protobuf package akka.cluster.protobuf
import akka.cluster._ import akka.cluster._
import akka.actor.{ Address, ExtendedActorSystem } import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
import akka.routing.{ DefaultOptimalSizeExploringResizer, RoundRobinPool } import akka.routing.RoundRobinPool
import collection.immutable.SortedSet import collection.immutable.SortedSet
import akka.testkit.AkkaSpec import akka.testkit.{ AkkaSpec, TestKit }
class ClusterMessageSerializerSpec extends AkkaSpec( class ClusterMessageSerializerSpec extends AkkaSpec(
"akka.actor.provider = cluster") { "akka.actor.provider = cluster") {
@ -75,9 +75,57 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
} }
"be compatible with wire format of version 2.5.3 (using use-role instead of use-roles)" in {
val system = ActorSystem("ClusterMessageSerializer-old-wire-format")
try {
val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
// the oldSnapshot was created with the version of ClusterRouterPoolSettings in Akka 2.5.3. See issue #23257.
// It was created with:
/*
import org.apache.commons.codec.binary.Hex.encodeHex
val bytes = serializer.toBinary(
ClusterRouterPool(RoundRobinPool(nrOfInstances = 4), ClusterRouterPoolSettings(123, 345, true, Some("role ABC"))))
println(String.valueOf(encodeHex(bytes)))
*/
val oldBytesHex = "0a0f08101205524f5252501a04080418001211087b10d90218012208726f6c6520414243"
import org.apache.commons.codec.binary.Hex.decodeHex
val oldBytes = decodeHex(oldBytesHex.toCharArray)
val result = serializer.fromBinary(oldBytes, classOf[ClusterRouterPool])
result match {
case pool: ClusterRouterPool
pool.settings.totalInstances should ===(123)
pool.settings.maxInstancesPerNode should ===(345)
pool.settings.allowLocalRoutees should ===(true)
pool.settings.useRole should ===(Some("role ABC"))
pool.settings.useRoles should ===(Set("role ABC"))
}
} finally {
TestKit.shutdownActorSystem(system)
}
}
} }
"Cluster router pool" must { "Cluster router pool" must {
"be serializable" in { "be serializable with no role" in {
checkSerialization(ClusterRouterPool(
RoundRobinPool(
nrOfInstances = 4
),
ClusterRouterPoolSettings(
totalInstances = 2,
maxInstancesPerNode = 5,
allowLocalRoutees = true
)
))
}
"be serializable with one role" in {
checkSerialization(ClusterRouterPool( checkSerialization(ClusterRouterPool(
RoundRobinPool( RoundRobinPool(
nrOfInstances = 4 nrOfInstances = 4
@ -86,7 +134,21 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
totalInstances = 2, totalInstances = 2,
maxInstancesPerNode = 5, maxInstancesPerNode = 5,
allowLocalRoutees = true, allowLocalRoutees = true,
useRole = Some("Richard, Duke of Gloucester") useRoles = Set("Richard, Duke of Gloucester")
)
))
}
"be serializable with many roles" in {
checkSerialization(ClusterRouterPool(
RoundRobinPool(
nrOfInstances = 4
),
ClusterRouterPoolSettings(
totalInstances = 2,
maxInstancesPerNode = 5,
allowLocalRoutees = true,
useRoles = Set("Richard, Duke of Gloucester", "Hongzhi Emperor", "Red Rackham")
) )
)) ))
} }

View file

@ -41,8 +41,7 @@ class ClusterRouterSupervisorSpec extends AkkaSpec("""
}), ClusterRouterPoolSettings( }), ClusterRouterPoolSettings(
totalInstances = 1, totalInstances = 1,
maxInstancesPerNode = 1, maxInstancesPerNode = 1,
allowLocalRoutees = true, allowLocalRoutees = true)).
useRole = None)).
props(Props(classOf[KillableActor], testActor)), name = "therouter") props(Props(classOf[KillableActor], testActor)), name = "therouter")
router ! "go away" router ! "go away"

View file

@ -185,7 +185,7 @@ akka.actor.deployment {
routees.paths = ["/user/factorialBackend"] routees.paths = ["/user/factorialBackend"]
cluster { cluster {
enabled = on enabled = on
use-role = backend use-roles = ["backend"]
allow-local-routees = off allow-local-routees = off
} }
} }

View file

@ -606,7 +606,7 @@ akka.actor.deployment {
cluster { cluster {
enabled = on enabled = on
allow-local-routees = on allow-local-routees = on
use-role = compute use-roles = ["compute"]
} }
} }
} }
@ -622,7 +622,7 @@ the router will try to use them as soon as the member status is changed to 'Up'.
The actor paths without address information that are defined in `routees.paths` are used for selecting the The actor paths without address information that are defined in `routees.paths` are used for selecting the
actors to which the messages will be forwarded to by the router. actors to which the messages will be forwarded to by the router.
Messages will be forwarded to the routees using @ref:[ActorSelection](actors.md#actorselection), so the same delivery semantics should be expected. Messages will be forwarded to the routees using @ref:[ActorSelection](actors.md#actorselection), so the same delivery semantics should be expected.
It is possible to limit the lookup of routees to member nodes tagged with a certain role by specifying `use-role`. It is possible to limit the lookup of routees to member nodes tagged with a particular set of roles by specifying `use-roles`.
`max-total-nr-of-instances` defines total number of routees in the cluster. By default `max-total-nr-of-instances` `max-total-nr-of-instances` defines total number of routees in the cluster. By default `max-total-nr-of-instances`
is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster. is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster.
@ -693,7 +693,7 @@ akka.actor.deployment {
cluster { cluster {
enabled = on enabled = on
allow-local-routees = on allow-local-routees = on
use-role = compute use-roles = ["compute"]
} }
} }
} }
@ -722,14 +722,14 @@ akka.actor.deployment {
enabled = on enabled = on
max-nr-of-instances-per-node = 3 max-nr-of-instances-per-node = 3
allow-local-routees = on allow-local-routees = on
use-role = compute use-roles = ["compute"]
} }
} }
} }
``` ```
It is possible to limit the deployment of routees to member nodes tagged with a certain role by It is possible to limit the deployment of routees to member nodes tagged with a particular set of roles by
specifying `use-role`. specifying `use-roles`.
`max-total-nr-of-instances` defines total number of routees in the cluster, but the number of routees `max-total-nr-of-instances` defines total number of routees in the cluster, but the number of routees
per node, `max-nr-of-instances-per-node`, will not be exceeded. By default `max-total-nr-of-instances` per node, `max-nr-of-instances-per-node`, will not be exceeded. By default `max-total-nr-of-instances`
@ -797,7 +797,7 @@ akka.actor.deployment {
enabled = on enabled = on
max-nr-of-instances-per-node = 3 max-nr-of-instances-per-node = 3
allow-local-routees = on allow-local-routees = on
use-role = compute use-roles = ["compute"]
} }
} }
} }

View file

@ -421,6 +421,14 @@ and here is a summary of things to consider.
* [mig25_sharding_store](#mig25-sharding-store) * [mig25_sharding_store](#mig25-sharding-store)
* [mig25_mutual](#mig25-mutual) * [mig25_mutual](#mig25-mutual)
#### Limit lookup of routees to nodes tagged with multiple roles
Starting with 2.5.4, cluster routing supports delivering messages to routees tagged with all specified roles
using `use-roles` (instead of `use-role` in previous versions). When doing rolling upgrades and using this new feature,
it is important to first upgrade the existing nodes to the latest version of Akka
and then start using multiple roles in a separate rolling upgrade. Otherwise, if a new node sends a message
with the restriction `use-roles = ["a", "b"]`, that will only require the "a" role on old nodes.
### Coordinated Shutdown ### Coordinated Shutdown
There is a new extension named `CoordinatedShutdown` that will stop certain actors and There is a new extension named `CoordinatedShutdown` that will stop certain actors and

View file

@ -2,6 +2,8 @@ package jdocs.cluster;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import akka.actor.Props; import akka.actor.Props;
@ -77,12 +79,12 @@ abstract class FactorialFrontend2 extends AbstractActor {
int totalInstances = 100; int totalInstances = 100;
Iterable<String> routeesPaths = Arrays.asList("/user/factorialBackend", ""); Iterable<String> routeesPaths = Arrays.asList("/user/factorialBackend", "");
boolean allowLocalRoutees = true; boolean allowLocalRoutees = true;
String useRole = "backend"; Set<String> useRoles = new HashSet<>(Arrays.asList("backend"));
ActorRef backend = getContext().actorOf( ActorRef backend = getContext().actorOf(
new ClusterRouterGroup(new AdaptiveLoadBalancingGroup( new ClusterRouterGroup(new AdaptiveLoadBalancingGroup(
HeapMetricsSelector.getInstance(), Collections.<String> emptyList()), HeapMetricsSelector.getInstance(), Collections.<String> emptyList()),
new ClusterRouterGroupSettings(totalInstances, routeesPaths, new ClusterRouterGroupSettings(totalInstances, routeesPaths,
allowLocalRoutees, useRole)).props(), "factorialBackendRouter2"); allowLocalRoutees, useRoles)).props(), "factorialBackendRouter2");
//#router-lookup-in-code //#router-lookup-in-code
} }
@ -93,12 +95,12 @@ abstract class FactorialFrontend3 extends AbstractActor {
int totalInstances = 100; int totalInstances = 100;
int maxInstancesPerNode = 3; int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false; boolean allowLocalRoutees = false;
String useRole = "backend"; Set<String> useRoles = new HashSet<>(Arrays.asList("backend"));
ActorRef backend = getContext().actorOf( ActorRef backend = getContext().actorOf(
new ClusterRouterPool(new AdaptiveLoadBalancingPool( new ClusterRouterPool(new AdaptiveLoadBalancingPool(
SystemLoadAverageMetricsSelector.getInstance(), 0), SystemLoadAverageMetricsSelector.getInstance(), 0),
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode,
allowLocalRoutees, useRole)).props(Props allowLocalRoutees, useRoles)).props(Props
.create(FactorialBackend.class)), "factorialBackendRouter3"); .create(FactorialBackend.class)), "factorialBackendRouter3");
//#router-deploy-in-code //#router-deploy-in-code
} }

View file

@ -13,7 +13,10 @@ import akka.actor.AbstractActor;
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
import akka.routing.FromConfig; import akka.routing.FromConfig;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
//#service //#service
public class StatsService extends AbstractActor { public class StatsService extends AbstractActor {
@ -55,11 +58,11 @@ abstract class StatsService2 extends AbstractActor {
Iterable<String> routeesPaths = Collections Iterable<String> routeesPaths = Collections
.singletonList("/user/statsWorker"); .singletonList("/user/statsWorker");
boolean allowLocalRoutees = true; boolean allowLocalRoutees = true;
String useRole = "compute"; Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
ActorRef workerRouter = getContext().actorOf( ActorRef workerRouter = getContext().actorOf(
new ClusterRouterGroup(new ConsistentHashingGroup(routeesPaths), new ClusterRouterGroup(new ConsistentHashingGroup(routeesPaths),
new ClusterRouterGroupSettings(totalInstances, routeesPaths, new ClusterRouterGroupSettings(totalInstances, routeesPaths,
allowLocalRoutees, useRole)).props(), "workerRouter2"); allowLocalRoutees, useRoles)).props(), "workerRouter2");
//#router-lookup-in-code //#router-lookup-in-code
} }
@ -69,11 +72,11 @@ abstract class StatsService3 extends AbstractActor {
int totalInstances = 100; int totalInstances = 100;
int maxInstancesPerNode = 3; int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false; boolean allowLocalRoutees = false;
String useRole = "compute"; Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
ActorRef workerRouter = getContext().actorOf( ActorRef workerRouter = getContext().actorOf(
new ClusterRouterPool(new ConsistentHashingPool(0), new ClusterRouterPool(new ConsistentHashingPool(0),
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode,
allowLocalRoutees, useRole)).props(Props allowLocalRoutees, useRoles)).props(Props
.create(StatsWorker.class)), "workerRouter3"); .create(StatsWorker.class)), "workerRouter3");
//#router-deploy-in-code //#router-deploy-in-code
} }

View file

@ -78,7 +78,7 @@ abstract class FactorialFrontend2 extends Actor {
AdaptiveLoadBalancingGroup(HeapMetricsSelector), AdaptiveLoadBalancingGroup(HeapMetricsSelector),
ClusterRouterGroupSettings( ClusterRouterGroupSettings(
totalInstances = 100, routeesPaths = List("/user/factorialBackend"), totalInstances = 100, routeesPaths = List("/user/factorialBackend"),
allowLocalRoutees = true, useRole = Some("backend"))).props(), allowLocalRoutees = true, useRoles = Set("backend"))).props(),
name = "factorialBackendRouter2") name = "factorialBackendRouter2")
//#router-lookup-in-code //#router-lookup-in-code
@ -96,7 +96,7 @@ abstract class FactorialFrontend3 extends Actor {
ClusterRouterPool(AdaptiveLoadBalancingPool( ClusterRouterPool(AdaptiveLoadBalancingPool(
SystemLoadAverageMetricsSelector), ClusterRouterPoolSettings( SystemLoadAverageMetricsSelector), ClusterRouterPoolSettings(
totalInstances = 100, maxInstancesPerNode = 3, totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false, useRole = Some("backend"))).props(Props[FactorialBackend]), allowLocalRoutees = false, useRoles = Set("backend"))).props(Props[FactorialBackend]),
name = "factorialBackendRouter3") name = "factorialBackendRouter3")
//#router-deploy-in-code //#router-deploy-in-code
} }