+clu #13584 Accept joining to be WeaklyUp during network split
* experimental feature, disabled by default * Adding documentation to mention weakly up members. plus adding new diagram.
This commit is contained in:
parent
3a436bb4a3
commit
c08bc317e2
19 changed files with 329 additions and 45 deletions
|
|
@ -122,6 +122,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
|
||||||
// TODO collapse to ClusterEvent._ after akka-cluster metrics is gone
|
// TODO collapse to ClusterEvent._ after akka-cluster metrics is gone
|
||||||
import ClusterEvent.MemberEvent
|
import ClusterEvent.MemberEvent
|
||||||
import ClusterEvent.MemberUp
|
import ClusterEvent.MemberUp
|
||||||
|
import ClusterEvent.MemberWeaklyUp
|
||||||
import ClusterEvent.MemberRemoved
|
import ClusterEvent.MemberRemoved
|
||||||
import ClusterEvent.MemberExited
|
import ClusterEvent.MemberExited
|
||||||
import ClusterEvent.ReachabilityEvent
|
import ClusterEvent.ReachabilityEvent
|
||||||
|
|
@ -174,6 +175,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
|
||||||
case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
|
case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
|
||||||
case state: CurrentClusterState ⇒ receiveState(state)
|
case state: CurrentClusterState ⇒ receiveState(state)
|
||||||
case MemberUp(m) ⇒ addMember(m)
|
case MemberUp(m) ⇒ addMember(m)
|
||||||
|
case MemberWeaklyUp(m) ⇒ addMember(m)
|
||||||
case MemberRemoved(m, _) ⇒ removeMember(m)
|
case MemberRemoved(m, _) ⇒ removeMember(m)
|
||||||
case MemberExited(m) ⇒ removeMember(m)
|
case MemberExited(m) ⇒ removeMember(m)
|
||||||
case UnreachableMember(m) ⇒ removeMember(m)
|
case UnreachableMember(m) ⇒ removeMember(m)
|
||||||
|
|
|
||||||
|
|
@ -138,6 +138,10 @@ public final class ClusterMessages {
|
||||||
* <code>Removed = 5;</code>
|
* <code>Removed = 5;</code>
|
||||||
*/
|
*/
|
||||||
Removed(5, 5),
|
Removed(5, 5),
|
||||||
|
/**
|
||||||
|
* <code>WeaklyUp = 6;</code>
|
||||||
|
*/
|
||||||
|
WeaklyUp(6, 6),
|
||||||
;
|
;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -164,6 +168,10 @@ public final class ClusterMessages {
|
||||||
* <code>Removed = 5;</code>
|
* <code>Removed = 5;</code>
|
||||||
*/
|
*/
|
||||||
public static final int Removed_VALUE = 5;
|
public static final int Removed_VALUE = 5;
|
||||||
|
/**
|
||||||
|
* <code>WeaklyUp = 6;</code>
|
||||||
|
*/
|
||||||
|
public static final int WeaklyUp_VALUE = 6;
|
||||||
|
|
||||||
|
|
||||||
public final int getNumber() { return value; }
|
public final int getNumber() { return value; }
|
||||||
|
|
@ -176,6 +184,7 @@ public final class ClusterMessages {
|
||||||
case 3: return Exiting;
|
case 3: return Exiting;
|
||||||
case 4: return Down;
|
case 4: return Down;
|
||||||
case 5: return Removed;
|
case 5: return Removed;
|
||||||
|
case 6: return WeaklyUp;
|
||||||
default: return null;
|
default: return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -16783,10 +16792,11 @@ public final class ClusterMessages {
|
||||||
"ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAdd" +
|
"ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAdd" +
|
||||||
"ress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002" +
|
"ress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002" +
|
||||||
" \002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020" +
|
" \002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020" +
|
||||||
"\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*T\n\014Me" +
|
"\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014Me" +
|
||||||
"mberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavi" +
|
"mberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavi" +
|
||||||
"ng\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005" +
|
"ng\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005" +
|
||||||
"B\035\n\031akka.cluster.protobuf.msgH\001"
|
"\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluster.protobuf.",
|
||||||
|
"msgH\001"
|
||||||
};
|
};
|
||||||
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
|
|
||||||
|
|
@ -157,6 +157,7 @@ enum MemberStatus {
|
||||||
Exiting = 3;
|
Exiting = 3;
|
||||||
Down = 4;
|
Down = 4;
|
||||||
Removed = 5;
|
Removed = 5;
|
||||||
|
WeaklyUp = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,14 @@ akka {
|
||||||
# handling network partitions.
|
# handling network partitions.
|
||||||
# Disable with "off" or specify a duration to enable.
|
# Disable with "off" or specify a duration to enable.
|
||||||
down-removal-margin = off
|
down-removal-margin = off
|
||||||
|
|
||||||
|
# By default, the leader will not move 'Joining' members to 'Up' during a network
|
||||||
|
# split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp'
|
||||||
|
# so they become part of the cluster even during a network split. The leader will
|
||||||
|
# move 'WeaklyUp' members to 'Up' status once convergence has been reached. This
|
||||||
|
# feature must be off if some members are running Akka 2.3.X.
|
||||||
|
allow-weakly-up-members = off
|
||||||
|
|
||||||
# The roles of this member. List of strings, e.g. roles = ["A", "B"].
|
# The roles of this member. List of strings, e.g. roles = ["A", "B"].
|
||||||
# The roles are part of the membership information and can be used by
|
# The roles are part of the membership information and can be used by
|
||||||
# routers or other services to distribute work to certain member types,
|
# routers or other services to distribute work to certain member types,
|
||||||
|
|
|
||||||
|
|
@ -829,6 +829,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
leaderActionCounter = 0
|
leaderActionCounter = 0
|
||||||
leaderActionsOnConvergence()
|
leaderActionsOnConvergence()
|
||||||
} else {
|
} else {
|
||||||
|
if (cluster.settings.AllowWeaklyUpMembers)
|
||||||
|
moveJoiningToWeaklyUp()
|
||||||
|
|
||||||
leaderActionCounter += 1
|
leaderActionCounter += 1
|
||||||
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
|
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
|
||||||
logInfo("Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
|
logInfo("Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
|
||||||
|
|
@ -859,6 +862,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def isMinNrOfMembersFulfilled: Boolean = {
|
||||||
|
latestGossip.members.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
|
||||||
|
case (role, threshold) ⇒ latestGossip.members.count(_.hasRole(role)) >= threshold
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Leader actions are as follows:
|
* Leader actions are as follows:
|
||||||
* 1. Move JOINING => UP -- When a node joins the cluster
|
* 1. Move JOINING => UP -- When a node joins the cluster
|
||||||
|
|
@ -878,12 +887,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
val localOverview = localGossip.overview
|
val localOverview = localGossip.overview
|
||||||
val localSeen = localOverview.seen
|
val localSeen = localOverview.seen
|
||||||
|
|
||||||
def enoughMembers: Boolean = {
|
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
|
||||||
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
|
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
|
||||||
case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold
|
|
||||||
}
|
|
||||||
}
|
|
||||||
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
|
|
||||||
|
|
||||||
val removedUnreachable = for {
|
val removedUnreachable = for {
|
||||||
node ← localOverview.reachability.allUnreachableOrTerminated
|
node ← localOverview.reachability.allUnreachableOrTerminated
|
||||||
|
|
@ -967,6 +972,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def moveJoiningToWeaklyUp(): Unit = {
|
||||||
|
val localGossip = latestGossip
|
||||||
|
val localMembers = localGossip.members
|
||||||
|
|
||||||
|
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
|
||||||
|
def isJoiningToWeaklyUp(m: Member): Boolean =
|
||||||
|
m.status == Joining && enoughMembers && latestGossip.reachabilityExcludingDownedObservers.isReachable(m.uniqueAddress)
|
||||||
|
val changedMembers = localMembers.collect {
|
||||||
|
case m if isJoiningToWeaklyUp(m) ⇒ m.copy(status = WeaklyUp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (changedMembers.nonEmpty) {
|
||||||
|
// replace changed members
|
||||||
|
val newMembers = changedMembers ++ localMembers
|
||||||
|
val newGossip = localGossip.copy(members = newMembers)
|
||||||
|
updateLatestGossip(newGossip)
|
||||||
|
|
||||||
|
// log status changes
|
||||||
|
changedMembers foreach { m ⇒
|
||||||
|
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
|
||||||
|
}
|
||||||
|
|
||||||
|
publish(latestGossip)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reaps the unreachable members according to the failure detector's verdict.
|
* Reaps the unreachable members according to the failure detector's verdict.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -121,6 +121,16 @@ object ClusterEvent {
|
||||||
def member: Member
|
def member: Member
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Member status changed to WeaklyUp.
|
||||||
|
* A joining member can be moved to `WeaklyUp` if convergence
|
||||||
|
* cannot be reached, i.e. there are unreachable nodes.
|
||||||
|
* It will be moved to `Up` when convergence is reached.
|
||||||
|
*/
|
||||||
|
final case class MemberWeaklyUp(member: Member) extends MemberEvent {
|
||||||
|
if (member.status != WeaklyUp) throw new IllegalArgumentException("Expected WeaklyUp status, got: " + member)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Member status changed to Up.
|
* Member status changed to Up.
|
||||||
*/
|
*/
|
||||||
|
|
@ -268,8 +278,9 @@ object ClusterEvent {
|
||||||
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember
|
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember
|
||||||
}
|
}
|
||||||
val memberEvents = (newMembers ++ changedMembers) collect {
|
val memberEvents = (newMembers ++ changedMembers) collect {
|
||||||
case m if m.status == Up ⇒ MemberUp(m)
|
case m if m.status == WeaklyUp ⇒ MemberWeaklyUp(m)
|
||||||
case m if m.status == Exiting ⇒ MemberExited(m)
|
case m if m.status == Up ⇒ MemberUp(m)
|
||||||
|
case m if m.status == Exiting ⇒ MemberExited(m)
|
||||||
// no events for other transitions
|
// no events for other transitions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
case HeartbeatTick ⇒ heartbeat()
|
case HeartbeatTick ⇒ heartbeat()
|
||||||
case HeartbeatRsp(from) ⇒ heartbeatRsp(from)
|
case HeartbeatRsp(from) ⇒ heartbeatRsp(from)
|
||||||
case MemberUp(m) ⇒ addMember(m)
|
case MemberUp(m) ⇒ addMember(m)
|
||||||
|
case MemberWeaklyUp(m) ⇒ addMember(m)
|
||||||
case MemberRemoved(m, _) ⇒ removeMember(m)
|
case MemberRemoved(m, _) ⇒ removeMember(m)
|
||||||
case UnreachableMember(m) ⇒ unreachableMember(m)
|
case UnreachableMember(m) ⇒ unreachableMember(m)
|
||||||
case ReachableMember(m) ⇒ reachableMember(m)
|
case ReachableMember(m) ⇒ reachableMember(m)
|
||||||
|
|
@ -120,7 +121,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
|
|
||||||
def init(snapshot: CurrentClusterState): Unit = {
|
def init(snapshot: CurrentClusterState): Unit = {
|
||||||
val nodes: Set[UniqueAddress] = snapshot.members.collect {
|
val nodes: Set[UniqueAddress] = snapshot.members.collect {
|
||||||
case m if m.status == MemberStatus.Up ⇒ m.uniqueAddress
|
case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.uniqueAddress
|
||||||
}(collection.breakOut)
|
}(collection.breakOut)
|
||||||
val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
|
val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
|
||||||
state = state.init(nodes, unreachable)
|
state = state.init(nodes, unreachable)
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState
|
||||||
import akka.cluster.ClusterEvent.MemberEvent
|
import akka.cluster.ClusterEvent.MemberEvent
|
||||||
import akka.cluster.ClusterEvent.MemberUp
|
import akka.cluster.ClusterEvent.MemberUp
|
||||||
import akka.cluster.ClusterEvent.MemberRemoved
|
import akka.cluster.ClusterEvent.MemberRemoved
|
||||||
|
import akka.cluster.ClusterEvent.MemberWeaklyUp
|
||||||
import akka.remote.FailureDetectorRegistry
|
import akka.remote.FailureDetectorRegistry
|
||||||
import akka.remote.RemoteWatcher
|
import akka.remote.RemoteWatcher
|
||||||
|
|
||||||
|
|
@ -72,23 +73,28 @@ private[cluster] class ClusterRemoteWatcher(
|
||||||
clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address }
|
clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address }
|
||||||
clusterNodes foreach takeOverResponsibility
|
clusterNodes foreach takeOverResponsibility
|
||||||
unreachable --= clusterNodes
|
unreachable --= clusterNodes
|
||||||
case MemberUp(m) ⇒
|
case MemberUp(m) ⇒ memberUp(m)
|
||||||
if (m.address != selfAddress) {
|
case MemberWeaklyUp(m) ⇒ memberUp(m)
|
||||||
clusterNodes += m.address
|
case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus)
|
||||||
takeOverResponsibility(m.address)
|
case _: MemberEvent ⇒ // not interesting
|
||||||
unreachable -= m.address
|
|
||||||
}
|
|
||||||
case MemberRemoved(m, previousStatus) ⇒
|
|
||||||
if (m.address != selfAddress) {
|
|
||||||
clusterNodes -= m.address
|
|
||||||
if (previousStatus == MemberStatus.Down) {
|
|
||||||
quarantine(m.address, Some(m.uniqueAddress.uid))
|
|
||||||
}
|
|
||||||
publishAddressTerminated(m.address)
|
|
||||||
}
|
|
||||||
case _: MemberEvent ⇒ // not interesting
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def memberUp(m: Member): Unit =
|
||||||
|
if (m.address != selfAddress) {
|
||||||
|
clusterNodes += m.address
|
||||||
|
takeOverResponsibility(m.address)
|
||||||
|
unreachable -= m.address
|
||||||
|
}
|
||||||
|
|
||||||
|
def memberRemoved(m: Member, previousStatus: MemberStatus): Unit =
|
||||||
|
if (m.address != selfAddress) {
|
||||||
|
clusterNodes -= m.address
|
||||||
|
if (previousStatus == MemberStatus.Down) {
|
||||||
|
quarantine(m.address, Some(m.uniqueAddress.uid))
|
||||||
|
}
|
||||||
|
publishAddressTerminated(m.address)
|
||||||
|
}
|
||||||
|
|
||||||
override def watchNode(watchee: InternalActorRef) =
|
override def watchNode(watchee: InternalActorRef) =
|
||||||
if (!clusterNodes(watchee.path.address)) super.watchNode(watchee)
|
if (!clusterNodes(watchee.path.address)) super.watchNode(watchee)
|
||||||
|
|
||||||
|
|
@ -103,4 +109,4 @@ private[cluster] class ClusterRemoteWatcher(
|
||||||
unwatchNode(address)
|
unwatchNode(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,8 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
|
||||||
|
|
||||||
val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet
|
val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet
|
||||||
val MinNrOfMembers: Int = {
|
val MinNrOfMembers: Int = {
|
||||||
cc.getInt("min-nr-of-members")
|
cc.getInt("min-nr-of-members")
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,8 @@ object Member {
|
||||||
case (_, Exiting) ⇒ true
|
case (_, Exiting) ⇒ true
|
||||||
case (Joining, _) ⇒ false
|
case (Joining, _) ⇒ false
|
||||||
case (_, Joining) ⇒ true
|
case (_, Joining) ⇒ true
|
||||||
|
case (WeaklyUp, _) ⇒ false
|
||||||
|
case (_, WeaklyUp) ⇒ true
|
||||||
case _ ⇒ ordering.compare(a, b) <= 0
|
case _ ⇒ ordering.compare(a, b) <= 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -140,17 +142,19 @@ object Member {
|
||||||
* Picks the Member with the highest "priority" MemberStatus.
|
* Picks the Member with the highest "priority" MemberStatus.
|
||||||
*/
|
*/
|
||||||
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
|
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
|
||||||
case (Removed, _) ⇒ m1
|
case (Removed, _) ⇒ m1
|
||||||
case (_, Removed) ⇒ m2
|
case (_, Removed) ⇒ m2
|
||||||
case (Down, _) ⇒ m1
|
case (Down, _) ⇒ m1
|
||||||
case (_, Down) ⇒ m2
|
case (_, Down) ⇒ m2
|
||||||
case (Exiting, _) ⇒ m1
|
case (Exiting, _) ⇒ m1
|
||||||
case (_, Exiting) ⇒ m2
|
case (_, Exiting) ⇒ m2
|
||||||
case (Leaving, _) ⇒ m1
|
case (Leaving, _) ⇒ m1
|
||||||
case (_, Leaving) ⇒ m2
|
case (_, Leaving) ⇒ m2
|
||||||
case (Joining, _) ⇒ m2
|
case (Joining, _) ⇒ m2
|
||||||
case (_, Joining) ⇒ m1
|
case (_, Joining) ⇒ m1
|
||||||
case (Up, Up) ⇒ m1
|
case (WeaklyUp, _) ⇒ m2
|
||||||
|
case (_, WeaklyUp) ⇒ m1
|
||||||
|
case (Up, Up) ⇒ m1
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -164,6 +168,7 @@ abstract class MemberStatus
|
||||||
|
|
||||||
object MemberStatus {
|
object MemberStatus {
|
||||||
@SerialVersionUID(1L) case object Joining extends MemberStatus
|
@SerialVersionUID(1L) case object Joining extends MemberStatus
|
||||||
|
@SerialVersionUID(1L) case object WeaklyUp extends MemberStatus
|
||||||
@SerialVersionUID(1L) case object Up extends MemberStatus
|
@SerialVersionUID(1L) case object Up extends MemberStatus
|
||||||
@SerialVersionUID(1L) case object Leaving extends MemberStatus
|
@SerialVersionUID(1L) case object Leaving extends MemberStatus
|
||||||
@SerialVersionUID(1L) case object Exiting extends MemberStatus
|
@SerialVersionUID(1L) case object Exiting extends MemberStatus
|
||||||
|
|
@ -205,7 +210,8 @@ object MemberStatus {
|
||||||
*/
|
*/
|
||||||
private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
|
private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
|
||||||
Map(
|
Map(
|
||||||
Joining -> Set(Up, Down, Removed),
|
Joining -> Set(WeaklyUp, Up, Down, Removed),
|
||||||
|
WeaklyUp -> Set(Up, Down, Removed),
|
||||||
Up -> Set(Leaving, Down, Removed),
|
Up -> Set(Leaving, Down, Removed),
|
||||||
Leaving -> Set(Exiting, Down, Removed),
|
Leaving -> Set(Exiting, Down, Removed),
|
||||||
Down -> Set(Removed),
|
Down -> Set(Removed),
|
||||||
|
|
|
||||||
|
|
@ -168,7 +168,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
|
||||||
MemberStatus.Leaving -> cm.MemberStatus.Leaving_VALUE,
|
MemberStatus.Leaving -> cm.MemberStatus.Leaving_VALUE,
|
||||||
MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE,
|
MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE,
|
||||||
MemberStatus.Down -> cm.MemberStatus.Down_VALUE,
|
MemberStatus.Down -> cm.MemberStatus.Down_VALUE,
|
||||||
MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE)
|
MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE,
|
||||||
|
MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE)
|
||||||
|
|
||||||
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) ⇒ (b, a) }
|
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) ⇒ (b, a) }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -382,7 +382,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
def isAvailable(m: Member): Boolean =
|
def isAvailable(m: Member): Boolean =
|
||||||
m.status == MemberStatus.Up &&
|
(m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) &&
|
||||||
satisfiesRole(m.roles) &&
|
satisfiesRole(m.roles) &&
|
||||||
(settings.allowLocalRoutees || m.address != cluster.selfAddress)
|
(settings.allowLocalRoutees || m.address != cluster.selfAddress)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import scala.language.postfixOps
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||||
|
import akka.testkit._
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import akka.cluster.MemberStatus.WeaklyUp
|
||||||
|
|
||||||
|
object MemberWeaklyUpSpec extends MultiNodeConfig {
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
val third = role("third")
|
||||||
|
val fourth = role("fourth")
|
||||||
|
val fifth = role("fifth")
|
||||||
|
|
||||||
|
commonConfig(debugConfig(on = false).
|
||||||
|
withFallback(ConfigFactory.parseString("""
|
||||||
|
akka.remote.retry-gate-closed-for = 3 s
|
||||||
|
akka.cluster.allow-weakly-up-members = on
|
||||||
|
""")).
|
||||||
|
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
|
|
||||||
|
testTransport(on = true)
|
||||||
|
}
|
||||||
|
|
||||||
|
class MemberWeaklyUpMultiJvmNode1 extends MemberWeaklyUpSpec
|
||||||
|
class MemberWeaklyUpMultiJvmNode2 extends MemberWeaklyUpSpec
|
||||||
|
class MemberWeaklyUpMultiJvmNode3 extends MemberWeaklyUpSpec
|
||||||
|
class MemberWeaklyUpMultiJvmNode4 extends MemberWeaklyUpSpec
|
||||||
|
class MemberWeaklyUpMultiJvmNode5 extends MemberWeaklyUpSpec
|
||||||
|
|
||||||
|
abstract class MemberWeaklyUpSpec
|
||||||
|
extends MultiNodeSpec(MemberWeaklyUpSpec)
|
||||||
|
with MultiNodeClusterSpec {
|
||||||
|
|
||||||
|
import MemberWeaklyUpSpec._
|
||||||
|
|
||||||
|
muteMarkingAsUnreachable()
|
||||||
|
|
||||||
|
val side1 = Vector(first, second)
|
||||||
|
val side2 = Vector(third, fourth, fifth)
|
||||||
|
|
||||||
|
"A cluster of 3 members" must {
|
||||||
|
|
||||||
|
"reach initial convergence" taggedAs LongRunningTest in {
|
||||||
|
awaitClusterUp(first, third, fourth)
|
||||||
|
|
||||||
|
enterBarrier("after-1")
|
||||||
|
}
|
||||||
|
|
||||||
|
"detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in within(20 seconds) {
|
||||||
|
runOn(first) {
|
||||||
|
// split the cluster in two parts (first, second) / (third, fourth, fifth)
|
||||||
|
for (role1 ← side1; role2 ← side2) {
|
||||||
|
testConductor.blackhole(role1, role2, Direction.Both).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enterBarrier("after-split")
|
||||||
|
|
||||||
|
runOn(first) {
|
||||||
|
awaitAssert(clusterView.unreachableMembers.map(_.address) should be(Set(address(third), address(fourth))))
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(third, fourth) {
|
||||||
|
awaitAssert(clusterView.unreachableMembers.map(_.address) should be(Set(address(first))))
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-2")
|
||||||
|
}
|
||||||
|
|
||||||
|
"accept joining on each side and set status to WeaklyUp" taggedAs LongRunningTest in within(20 seconds) {
|
||||||
|
runOn(second) {
|
||||||
|
Cluster(system).join(first)
|
||||||
|
}
|
||||||
|
runOn(fifth) {
|
||||||
|
Cluster(system).join(fourth)
|
||||||
|
}
|
||||||
|
enterBarrier("joined")
|
||||||
|
|
||||||
|
runOn(side1: _*) {
|
||||||
|
awaitAssert {
|
||||||
|
clusterView.members.size should be(4)
|
||||||
|
clusterView.members.exists { m ⇒ m.address == address(second) && m.status == WeaklyUp } should be(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(side2: _*) {
|
||||||
|
awaitAssert {
|
||||||
|
clusterView.members.size should be(4)
|
||||||
|
clusterView.members.exists { m ⇒ m.address == address(fifth) && m.status == WeaklyUp } should be(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-3")
|
||||||
|
}
|
||||||
|
|
||||||
|
"change status to Up after healed network partition" taggedAs LongRunningTest in within(20 seconds) {
|
||||||
|
runOn(first) {
|
||||||
|
for (role1 ← side1; role2 ← side2) {
|
||||||
|
testConductor.passThrough(role1, role2, Direction.Both).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enterBarrier("after-passThrough")
|
||||||
|
|
||||||
|
awaitAllReachable()
|
||||||
|
awaitMembersUp(5)
|
||||||
|
|
||||||
|
enterBarrier("after-4")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -26,6 +26,17 @@ object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig {
|
||||||
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object MinMembersBeforeUpWithWeaklyUpMultiJvmSpec extends MultiNodeConfig {
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
val third = role("third")
|
||||||
|
|
||||||
|
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
|
||||||
|
akka.cluster.min-nr-of-members = 3
|
||||||
|
akka.cluster.allow-weakly-up-members = on""")).
|
||||||
|
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||||
|
}
|
||||||
|
|
||||||
object MinMembersOfRoleBeforeUpMultiJvmSpec extends MultiNodeConfig {
|
object MinMembersOfRoleBeforeUpMultiJvmSpec extends MultiNodeConfig {
|
||||||
val first = role("first")
|
val first = role("first")
|
||||||
val second = role("second")
|
val second = role("second")
|
||||||
|
|
@ -46,6 +57,10 @@ class MinMembersBeforeUpMultiJvmNode1 extends MinMembersBeforeUpSpec
|
||||||
class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec
|
class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec
|
||||||
class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec
|
class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec
|
||||||
|
|
||||||
|
class MinMembersBeforeUpWithWeaklyUpMultiJvmNode1 extends MinMembersBeforeUpSpec
|
||||||
|
class MinMembersBeforeUpWithWeaklyUpMultiJvmNode2 extends MinMembersBeforeUpSpec
|
||||||
|
class MinMembersBeforeUpWithWeaklyUpMultiJvmNode3 extends MinMembersBeforeUpSpec
|
||||||
|
|
||||||
class MinMembersOfRoleBeforeUpMultiJvmNode1 extends MinMembersOfRoleBeforeUpSpec
|
class MinMembersOfRoleBeforeUpMultiJvmNode1 extends MinMembersOfRoleBeforeUpSpec
|
||||||
class MinMembersOfRoleBeforeUpMultiJvmNode2 extends MinMembersOfRoleBeforeUpSpec
|
class MinMembersOfRoleBeforeUpMultiJvmNode2 extends MinMembersOfRoleBeforeUpSpec
|
||||||
class MinMembersOfRoleBeforeUpMultiJvmNode3 extends MinMembersOfRoleBeforeUpSpec
|
class MinMembersOfRoleBeforeUpMultiJvmNode3 extends MinMembersOfRoleBeforeUpSpec
|
||||||
|
|
@ -63,6 +78,19 @@ abstract class MinMembersBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersB
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract class MinMembersBeforeUpWithWeaklyUpSpec extends MinMembersBeforeUpBase(MinMembersBeforeUpMultiJvmSpec) {
|
||||||
|
|
||||||
|
override def first: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.first
|
||||||
|
override def second: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.second
|
||||||
|
override def third: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.third
|
||||||
|
|
||||||
|
"Cluster leader" must {
|
||||||
|
"wait with moving members to UP until minimum number of members have joined with weakly up enabled" taggedAs LongRunningTest in {
|
||||||
|
testWaitMovingMembersToUp()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
abstract class MinMembersOfRoleBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersOfRoleBeforeUpMultiJvmSpec) {
|
abstract class MinMembersOfRoleBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersOfRoleBeforeUpMultiJvmSpec) {
|
||||||
|
|
||||||
override def first: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.first
|
override def first: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.first
|
||||||
|
|
|
||||||
|
|
@ -273,12 +273,22 @@ leader, also *auto-down* a node after a configured time of unreachability..
|
||||||
follows from the fact that the ``unreachable`` node will likely see the rest of
|
follows from the fact that the ``unreachable`` node will likely see the rest of
|
||||||
the cluster as ``unreachable``, become its own leader and form its own cluster.
|
the cluster as ``unreachable``, become its own leader and form its own cluster.
|
||||||
|
|
||||||
|
As mentioned before, if a node is ``unreachable`` then gossip convergence is not
|
||||||
|
possible and therefore any ``leader`` actions are also not possible. By enabling
|
||||||
|
``akka.cluster.allow-weakly-up-members`` it is possible to let new joining nodes be
|
||||||
|
promoted while convergence is not yet reached. These ``Joining`` nodes will be
|
||||||
|
promoted as ``WeaklyUp``. Once gossip convergence is reached, the leader will move
|
||||||
|
``WeaklyUp`` members to ``Up``.
|
||||||
|
|
||||||
State Diagram for the Member States
|
State Diagram for the Member States (``akka.cluster.allow-weakly-up-members=off``)
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
.. image:: ../images/member-states.png
|
.. image:: ../images/member-states.png
|
||||||
|
|
||||||
|
State Diagram for the Member States (``akka.cluster.allow-weakly-up-members=on``)
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
.. image:: ../images/member-states-weakly-up.png
|
||||||
|
|
||||||
Member States
|
Member States
|
||||||
^^^^^^^^^^^^^
|
^^^^^^^^^^^^^
|
||||||
|
|
@ -286,6 +296,9 @@ Member States
|
||||||
- **joining**
|
- **joining**
|
||||||
transient state when joining a cluster
|
transient state when joining a cluster
|
||||||
|
|
||||||
|
- **weakly up**
|
||||||
|
transient state while network split (only if ``akka.cluster.allow-weakly-up-members=on``)
|
||||||
|
|
||||||
- **up**
|
- **up**
|
||||||
normal operating state
|
normal operating state
|
||||||
|
|
||||||
|
|
|
||||||
BIN
akka-docs/rst/images/member-states-weakly-up.png
Normal file
BIN
akka-docs/rst/images/member-states-weakly-up.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 43 KiB |
|
|
@ -169,6 +169,27 @@ leaving member will be shutdown after the leader has changed status of the membe
|
||||||
automatically, but in case of network failures during this process it might still be necessary
|
automatically, but in case of network failures during this process it might still be necessary
|
||||||
to set the node’s status to ``Down`` in order to complete the removal.
|
to set the node’s status to ``Down`` in order to complete the removal.
|
||||||
|
|
||||||
|
WeaklyUp Members
|
||||||
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
If a node is ``unreachable`` then gossip convergence is not possible and therefore any
|
||||||
|
``leader`` actions are also not possible. However, we still might want new nodes to join
|
||||||
|
the cluster in this scenario.
|
||||||
|
|
||||||
|
With a configuration option you can allow this behavior::
|
||||||
|
|
||||||
|
akka.cluster.allow-weakly-up-members = on
|
||||||
|
|
||||||
|
When ``allow-weakly-up-members`` is enabled and there is no gossip convergence,
|
||||||
|
``Joining`` members will be promoted to ``WeaklyUp`` and they will become part of the
|
||||||
|
cluster. Once gossip convergence is reached, the leader will move ``WeaklyUp``
|
||||||
|
members to ``Up``.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
This feature is only available from Akka 2.4.0 and cannot be used if some of your
|
||||||
|
cluster members are running an older version of Akka.
|
||||||
|
|
||||||
.. _cluster_subscriber_java:
|
.. _cluster_subscriber_java:
|
||||||
|
|
||||||
Subscribe to Cluster Events
|
Subscribe to Cluster Events
|
||||||
|
|
|
||||||
|
|
@ -163,6 +163,27 @@ leaving member will be shutdown after the leader has changed status of the membe
|
||||||
automatically, but in case of network failures during this process it might still be necessary
|
automatically, but in case of network failures during this process it might still be necessary
|
||||||
to set the node’s status to ``Down`` in order to complete the removal.
|
to set the node’s status to ``Down`` in order to complete the removal.
|
||||||
|
|
||||||
|
WeaklyUp Members
|
||||||
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
If a node is ``unreachable`` then gossip convergence is not possible and therefore any
|
||||||
|
``leader`` actions are also not possible. However, we still might want new nodes to join
|
||||||
|
the cluster in this scenario.
|
||||||
|
|
||||||
|
With a configuration option you can allow this behavior::
|
||||||
|
|
||||||
|
akka.cluster.allow-weakly-up-members = on
|
||||||
|
|
||||||
|
When ``allow-weakly-up-members`` is enabled and there is no gossip convergence,
|
||||||
|
``Joining`` members will be promoted to ``WeaklyUp`` and they will become part of the
|
||||||
|
cluster. Once gossip convergence is reached, the leader will move ``WeaklyUp``
|
||||||
|
members to ``Up``.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
This feature is only available from Akka 2.4.0 and cannot be used if some of your
|
||||||
|
cluster members are running an older version of Akka.
|
||||||
|
|
||||||
.. _cluster_subscriber_scala:
|
.. _cluster_subscriber_scala:
|
||||||
|
|
||||||
Subscribe to Cluster Events
|
Subscribe to Cluster Events
|
||||||
|
|
|
||||||
|
|
@ -566,7 +566,10 @@ object MiMa extends AutoPlugin {
|
||||||
FilterAnyProblemStartingWith("akka.remote.serialization.DaemonMsgCreateSerializer"),
|
FilterAnyProblemStartingWith("akka.remote.serialization.DaemonMsgCreateSerializer"),
|
||||||
FilterAnyProblemStartingWith("akka.remote.testconductor.TestConductorProtocol"),
|
FilterAnyProblemStartingWith("akka.remote.testconductor.TestConductorProtocol"),
|
||||||
FilterAnyProblemStartingWith("akka.cluster.protobuf.msg.ClusterMessages"),
|
FilterAnyProblemStartingWith("akka.cluster.protobuf.msg.ClusterMessages"),
|
||||||
FilterAnyProblemStartingWith("akka.cluster.protobuf.ClusterMessageSerializer")
|
FilterAnyProblemStartingWith("akka.cluster.protobuf.ClusterMessageSerializer"),
|
||||||
|
|
||||||
|
// #13584 change in internal actor
|
||||||
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterCoreDaemon.akka$cluster$ClusterCoreDaemon$$isJoiningToUp$1")
|
||||||
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue