Merge pull request #18398 from akka/wip-13584-WeaklyUp2-patriknw
+clu #13584 Accept joining to be WeaklyUp during network split
This commit is contained in:
commit
acf0e76244
31 changed files with 436 additions and 54 deletions
|
|
@ -122,6 +122,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
|
|||
// TODO collapse to ClusterEvent._ after akka-cluster metrics is gone
|
||||
import ClusterEvent.MemberEvent
|
||||
import ClusterEvent.MemberUp
|
||||
import ClusterEvent.MemberWeaklyUp
|
||||
import ClusterEvent.MemberRemoved
|
||||
import ClusterEvent.MemberExited
|
||||
import ClusterEvent.ReachabilityEvent
|
||||
|
|
@ -174,11 +175,14 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
|
|||
case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
|
||||
case state: CurrentClusterState ⇒ receiveState(state)
|
||||
case MemberUp(m) ⇒ addMember(m)
|
||||
case MemberWeaklyUp(m) ⇒ addMember(m)
|
||||
case MemberRemoved(m, _) ⇒ removeMember(m)
|
||||
case MemberExited(m) ⇒ removeMember(m)
|
||||
case UnreachableMember(m) ⇒ removeMember(m)
|
||||
case ReachableMember(m) ⇒ if (m.status == MemberStatus.Up) addMember(m)
|
||||
case _: MemberEvent ⇒ // not interested in other types of MemberEvent
|
||||
case ReachableMember(m) ⇒
|
||||
if (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp)
|
||||
addMember(m)
|
||||
case _: MemberEvent ⇒ // not interested in other types of MemberEvent
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -207,7 +211,9 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
|
|||
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus]] `Up`.
|
||||
*/
|
||||
def receiveState(state: CurrentClusterState): Unit =
|
||||
nodes = (state.members -- state.unreachable) collect { case m if m.status == MemberStatus.Up ⇒ m.address }
|
||||
nodes = (state.members -- state.unreachable) collect {
|
||||
case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.address
|
||||
}
|
||||
|
||||
/**
|
||||
* Samples the latest metrics for the node, updates metrics statistics in
|
||||
|
|
|
|||
|
|
@ -595,6 +595,10 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Act
|
|||
if (matchingRole(m))
|
||||
nodes += m.address
|
||||
|
||||
case MemberWeaklyUp(m) ⇒
|
||||
if (matchingRole(m))
|
||||
nodes += m.address
|
||||
|
||||
case MemberRemoved(m, _) ⇒
|
||||
if (m.address == selfAddress)
|
||||
context stop self
|
||||
|
|
|
|||
|
|
@ -138,6 +138,10 @@ public final class ClusterMessages {
|
|||
* <code>Removed = 5;</code>
|
||||
*/
|
||||
Removed(5, 5),
|
||||
/**
|
||||
* <code>WeaklyUp = 6;</code>
|
||||
*/
|
||||
WeaklyUp(6, 6),
|
||||
;
|
||||
|
||||
/**
|
||||
|
|
@ -164,6 +168,10 @@ public final class ClusterMessages {
|
|||
* <code>Removed = 5;</code>
|
||||
*/
|
||||
public static final int Removed_VALUE = 5;
|
||||
/**
|
||||
* <code>WeaklyUp = 6;</code>
|
||||
*/
|
||||
public static final int WeaklyUp_VALUE = 6;
|
||||
|
||||
|
||||
public final int getNumber() { return value; }
|
||||
|
|
@ -176,6 +184,7 @@ public final class ClusterMessages {
|
|||
case 3: return Exiting;
|
||||
case 4: return Down;
|
||||
case 5: return Removed;
|
||||
case 6: return WeaklyUp;
|
||||
default: return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -16783,10 +16792,11 @@ public final class ClusterMessages {
|
|||
"ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAdd" +
|
||||
"ress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002" +
|
||||
" \002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020" +
|
||||
"\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*T\n\014Me" +
|
||||
"\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014Me" +
|
||||
"mberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavi" +
|
||||
"ng\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005" +
|
||||
"B\035\n\031akka.cluster.protobuf.msgH\001"
|
||||
"\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluster.protobuf.",
|
||||
"msgH\001"
|
||||
};
|
||||
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
|
|
|||
|
|
@ -157,6 +157,7 @@ enum MemberStatus {
|
|||
Exiting = 3;
|
||||
Down = 4;
|
||||
Removed = 5;
|
||||
WeaklyUp = 6;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -39,7 +39,15 @@ akka {
|
|||
# handling network partitions.
|
||||
# Disable with "off" or specify a duration to enable.
|
||||
down-removal-margin = off
|
||||
|
||||
|
||||
# By default, the leader will not move 'Joining' members to 'Up' during a network
|
||||
# split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp'
|
||||
# so they become part of the cluster even during a network split. The leader will
|
||||
# move 'WeaklyUp' members to 'Up' status once convergence has been reached. This
|
||||
# feature must be off if some members are running Akka 2.3.X.
|
||||
# WeaklyUp is an EXPERIMENTAL feature.
|
||||
allow-weakly-up-members = off
|
||||
|
||||
# The roles of this member. List of strings, e.g. roles = ["A", "B"].
|
||||
# The roles are part of the membership information and can be used by
|
||||
# routers or other services to distribute work to certain member types,
|
||||
|
|
|
|||
|
|
@ -829,6 +829,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
leaderActionCounter = 0
|
||||
leaderActionsOnConvergence()
|
||||
} else {
|
||||
if (cluster.settings.AllowWeaklyUpMembers)
|
||||
moveJoiningToWeaklyUp()
|
||||
|
||||
leaderActionCounter += 1
|
||||
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
|
||||
logInfo("Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
|
||||
|
|
@ -859,6 +862,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
}
|
||||
|
||||
def isMinNrOfMembersFulfilled: Boolean = {
|
||||
latestGossip.members.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
|
||||
case (role, threshold) ⇒ latestGossip.members.count(_.hasRole(role)) >= threshold
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Leader actions are as follows:
|
||||
* 1. Move JOINING => UP -- When a node joins the cluster
|
||||
|
|
@ -878,12 +887,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
val localOverview = localGossip.overview
|
||||
val localSeen = localOverview.seen
|
||||
|
||||
def enoughMembers: Boolean = {
|
||||
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
|
||||
case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold
|
||||
}
|
||||
}
|
||||
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
|
||||
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
|
||||
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
|
||||
|
||||
val removedUnreachable = for {
|
||||
node ← localOverview.reachability.allUnreachableOrTerminated
|
||||
|
|
@ -967,6 +972,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
}
|
||||
|
||||
def moveJoiningToWeaklyUp(): Unit = {
|
||||
val localGossip = latestGossip
|
||||
val localMembers = localGossip.members
|
||||
|
||||
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
|
||||
def isJoiningToWeaklyUp(m: Member): Boolean =
|
||||
m.status == Joining && enoughMembers && latestGossip.reachabilityExcludingDownedObservers.isReachable(m.uniqueAddress)
|
||||
val changedMembers = localMembers.collect {
|
||||
case m if isJoiningToWeaklyUp(m) ⇒ m.copy(status = WeaklyUp)
|
||||
}
|
||||
|
||||
if (changedMembers.nonEmpty) {
|
||||
// replace changed members
|
||||
val newMembers = changedMembers ++ localMembers
|
||||
val newGossip = localGossip.copy(members = newMembers)
|
||||
updateLatestGossip(newGossip)
|
||||
|
||||
// log status changes
|
||||
changedMembers foreach { m ⇒
|
||||
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Reaps the unreachable members according to the failure detector's verdict.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -121,6 +121,16 @@ object ClusterEvent {
|
|||
def member: Member
|
||||
}
|
||||
|
||||
/**
|
||||
* Member status changed to WeaklyUp.
|
||||
* A joining member can be moved to `WeaklyUp` if convergence
|
||||
* cannot be reached, i.e. there are unreachable nodes.
|
||||
* It will be moved to `Up` when convergence is reached.
|
||||
*/
|
||||
final case class MemberWeaklyUp(member: Member) extends MemberEvent {
|
||||
if (member.status != WeaklyUp) throw new IllegalArgumentException("Expected WeaklyUp status, got: " + member)
|
||||
}
|
||||
|
||||
/**
|
||||
* Member status changed to Up.
|
||||
*/
|
||||
|
|
@ -268,8 +278,9 @@ object ClusterEvent {
|
|||
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember
|
||||
}
|
||||
val memberEvents = (newMembers ++ changedMembers) collect {
|
||||
case m if m.status == Up ⇒ MemberUp(m)
|
||||
case m if m.status == Exiting ⇒ MemberExited(m)
|
||||
case m if m.status == WeaklyUp ⇒ MemberWeaklyUp(m)
|
||||
case m if m.status == Up ⇒ MemberUp(m)
|
||||
case m if m.status == Exiting ⇒ MemberExited(m)
|
||||
// no events for other transitions
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -111,6 +111,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
case HeartbeatTick ⇒ heartbeat()
|
||||
case HeartbeatRsp(from) ⇒ heartbeatRsp(from)
|
||||
case MemberUp(m) ⇒ addMember(m)
|
||||
case MemberWeaklyUp(m) ⇒ addMember(m)
|
||||
case MemberRemoved(m, _) ⇒ removeMember(m)
|
||||
case UnreachableMember(m) ⇒ unreachableMember(m)
|
||||
case ReachableMember(m) ⇒ reachableMember(m)
|
||||
|
|
@ -120,7 +121,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
|
||||
def init(snapshot: CurrentClusterState): Unit = {
|
||||
val nodes: Set[UniqueAddress] = snapshot.members.collect {
|
||||
case m if m.status == MemberStatus.Up ⇒ m.uniqueAddress
|
||||
case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.uniqueAddress
|
||||
}(collection.breakOut)
|
||||
val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
|
||||
state = state.init(nodes, unreachable)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.cluster
|
||||
|
||||
// TODO remove metrics
|
||||
// TODO remove metrics
|
||||
|
||||
import java.io.Closeable
|
||||
import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
|
||||
|
|
@ -24,6 +24,7 @@ import akka.actor.Address
|
|||
import akka.actor.DynamicAccess
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.cluster.MemberStatus.Up
|
||||
import akka.cluster.MemberStatus.WeaklyUp
|
||||
import akka.event.Logging
|
||||
import java.lang.management.MemoryUsage
|
||||
|
||||
|
|
@ -89,11 +90,14 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
|||
case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
|
||||
case state: CurrentClusterState ⇒ receiveState(state)
|
||||
case MemberUp(m) ⇒ addMember(m)
|
||||
case MemberWeaklyUp(m) ⇒ addMember(m)
|
||||
case MemberRemoved(m, _) ⇒ removeMember(m)
|
||||
case MemberExited(m) ⇒ removeMember(m)
|
||||
case UnreachableMember(m) ⇒ removeMember(m)
|
||||
case ReachableMember(m) ⇒ if (m.status == Up) addMember(m)
|
||||
case _: MemberEvent ⇒ // not interested in other types of MemberEvent
|
||||
case ReachableMember(m) ⇒
|
||||
if (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp)
|
||||
addMember(m)
|
||||
case _: MemberEvent ⇒ // not interested in other types of MemberEvent
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -122,7 +126,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
|
|||
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus]] `Up`.
|
||||
*/
|
||||
def receiveState(state: CurrentClusterState): Unit =
|
||||
nodes = state.members collect { case m if m.status == Up ⇒ m.address }
|
||||
nodes = state.members collect { case m if m.status == Up || m.status == WeaklyUp ⇒ m.address }
|
||||
|
||||
/**
|
||||
* Samples the latest metrics for the node, updates metrics statistics in
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState
|
|||
import akka.cluster.ClusterEvent.MemberEvent
|
||||
import akka.cluster.ClusterEvent.MemberUp
|
||||
import akka.cluster.ClusterEvent.MemberRemoved
|
||||
import akka.cluster.ClusterEvent.MemberWeaklyUp
|
||||
import akka.remote.FailureDetectorRegistry
|
||||
import akka.remote.RemoteWatcher
|
||||
|
||||
|
|
@ -72,23 +73,28 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address }
|
||||
clusterNodes foreach takeOverResponsibility
|
||||
unreachable --= clusterNodes
|
||||
case MemberUp(m) ⇒
|
||||
if (m.address != selfAddress) {
|
||||
clusterNodes += m.address
|
||||
takeOverResponsibility(m.address)
|
||||
unreachable -= m.address
|
||||
}
|
||||
case MemberRemoved(m, previousStatus) ⇒
|
||||
if (m.address != selfAddress) {
|
||||
clusterNodes -= m.address
|
||||
if (previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.uid))
|
||||
}
|
||||
publishAddressTerminated(m.address)
|
||||
}
|
||||
case _: MemberEvent ⇒ // not interesting
|
||||
case MemberUp(m) ⇒ memberUp(m)
|
||||
case MemberWeaklyUp(m) ⇒ memberUp(m)
|
||||
case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus)
|
||||
case _: MemberEvent ⇒ // not interesting
|
||||
}
|
||||
|
||||
def memberUp(m: Member): Unit =
|
||||
if (m.address != selfAddress) {
|
||||
clusterNodes += m.address
|
||||
takeOverResponsibility(m.address)
|
||||
unreachable -= m.address
|
||||
}
|
||||
|
||||
def memberRemoved(m: Member, previousStatus: MemberStatus): Unit =
|
||||
if (m.address != selfAddress) {
|
||||
clusterNodes -= m.address
|
||||
if (previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.uid))
|
||||
}
|
||||
publishAddressTerminated(m.address)
|
||||
}
|
||||
|
||||
override def watchNode(watchee: InternalActorRef) =
|
||||
if (!clusterNodes(watchee.path.address)) super.watchNode(watchee)
|
||||
|
||||
|
|
@ -103,4 +109,4 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
unwatchNode(address)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,6 +75,8 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
|||
}
|
||||
}
|
||||
|
||||
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
|
||||
|
||||
val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet
|
||||
val MinNrOfMembers: Int = {
|
||||
cc.getInt("min-nr-of-members")
|
||||
|
|
|
|||
|
|
@ -108,6 +108,8 @@ object Member {
|
|||
case (_, Exiting) ⇒ true
|
||||
case (Joining, _) ⇒ false
|
||||
case (_, Joining) ⇒ true
|
||||
case (WeaklyUp, _) ⇒ false
|
||||
case (_, WeaklyUp) ⇒ true
|
||||
case _ ⇒ ordering.compare(a, b) <= 0
|
||||
}
|
||||
}
|
||||
|
|
@ -140,17 +142,19 @@ object Member {
|
|||
* Picks the Member with the highest "priority" MemberStatus.
|
||||
*/
|
||||
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
|
||||
case (Removed, _) ⇒ m1
|
||||
case (_, Removed) ⇒ m2
|
||||
case (Down, _) ⇒ m1
|
||||
case (_, Down) ⇒ m2
|
||||
case (Exiting, _) ⇒ m1
|
||||
case (_, Exiting) ⇒ m2
|
||||
case (Leaving, _) ⇒ m1
|
||||
case (_, Leaving) ⇒ m2
|
||||
case (Joining, _) ⇒ m2
|
||||
case (_, Joining) ⇒ m1
|
||||
case (Up, Up) ⇒ m1
|
||||
case (Removed, _) ⇒ m1
|
||||
case (_, Removed) ⇒ m2
|
||||
case (Down, _) ⇒ m1
|
||||
case (_, Down) ⇒ m2
|
||||
case (Exiting, _) ⇒ m1
|
||||
case (_, Exiting) ⇒ m2
|
||||
case (Leaving, _) ⇒ m1
|
||||
case (_, Leaving) ⇒ m2
|
||||
case (Joining, _) ⇒ m2
|
||||
case (_, Joining) ⇒ m1
|
||||
case (WeaklyUp, _) ⇒ m2
|
||||
case (_, WeaklyUp) ⇒ m1
|
||||
case (Up, Up) ⇒ m1
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -164,6 +168,11 @@ abstract class MemberStatus
|
|||
|
||||
object MemberStatus {
|
||||
@SerialVersionUID(1L) case object Joining extends MemberStatus
|
||||
/**
|
||||
* WeaklyUp is an EXPERIMENTAL feature and is subject to change until
|
||||
* it has received more real world testing.
|
||||
*/
|
||||
@SerialVersionUID(1L) case object WeaklyUp extends MemberStatus
|
||||
@SerialVersionUID(1L) case object Up extends MemberStatus
|
||||
@SerialVersionUID(1L) case object Leaving extends MemberStatus
|
||||
@SerialVersionUID(1L) case object Exiting extends MemberStatus
|
||||
|
|
@ -175,6 +184,13 @@ object MemberStatus {
|
|||
*/
|
||||
def joining: MemberStatus = Joining
|
||||
|
||||
/**
|
||||
* Java API: retrieve the “weaklyUp” status singleton.
|
||||
* WeaklyUp is an EXPERIMENTAL feature and is subject to change until
|
||||
* it has received more real world testing.
|
||||
*/
|
||||
def weaklyUp: MemberStatus = WeaklyUp
|
||||
|
||||
/**
|
||||
* Java API: retrieve the “up” status singleton
|
||||
*/
|
||||
|
|
@ -205,7 +221,8 @@ object MemberStatus {
|
|||
*/
|
||||
private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
|
||||
Map(
|
||||
Joining -> Set(Up, Down, Removed),
|
||||
Joining -> Set(WeaklyUp, Up, Down, Removed),
|
||||
WeaklyUp -> Set(Up, Down, Removed),
|
||||
Up -> Set(Leaving, Down, Removed),
|
||||
Leaving -> Set(Exiting, Down, Removed),
|
||||
Down -> Set(Removed),
|
||||
|
|
|
|||
|
|
@ -168,7 +168,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
|
|||
MemberStatus.Leaving -> cm.MemberStatus.Leaving_VALUE,
|
||||
MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE,
|
||||
MemberStatus.Down -> cm.MemberStatus.Down_VALUE,
|
||||
MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE)
|
||||
MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE,
|
||||
MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE)
|
||||
|
||||
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) ⇒ (b, a) }
|
||||
|
||||
|
|
|
|||
|
|
@ -382,7 +382,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
|
|||
}
|
||||
|
||||
def isAvailable(m: Member): Boolean =
|
||||
m.status == MemberStatus.Up &&
|
||||
(m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) &&
|
||||
satisfiesRole(m.roles) &&
|
||||
(settings.allowLocalRoutees || m.address != cluster.selfAddress)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,119 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.language.postfixOps
|
||||
import scala.concurrent.duration._
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||
import akka.testkit._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.cluster.MemberStatus.WeaklyUp
|
||||
|
||||
object MemberWeaklyUpSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
val fifth = role("fifth")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("""
|
||||
akka.remote.retry-gate-closed-for = 3 s
|
||||
akka.cluster.allow-weakly-up-members = on
|
||||
""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
testTransport(on = true)
|
||||
}
|
||||
|
||||
class MemberWeaklyUpMultiJvmNode1 extends MemberWeaklyUpSpec
|
||||
class MemberWeaklyUpMultiJvmNode2 extends MemberWeaklyUpSpec
|
||||
class MemberWeaklyUpMultiJvmNode3 extends MemberWeaklyUpSpec
|
||||
class MemberWeaklyUpMultiJvmNode4 extends MemberWeaklyUpSpec
|
||||
class MemberWeaklyUpMultiJvmNode5 extends MemberWeaklyUpSpec
|
||||
|
||||
abstract class MemberWeaklyUpSpec
|
||||
extends MultiNodeSpec(MemberWeaklyUpSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MemberWeaklyUpSpec._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
val side1 = Vector(first, second)
|
||||
val side2 = Vector(third, fourth, fifth)
|
||||
|
||||
"A cluster of 3 members" must {
|
||||
|
||||
"reach initial convergence" taggedAs LongRunningTest in {
|
||||
awaitClusterUp(first, third, fourth)
|
||||
|
||||
enterBarrier("after-1")
|
||||
}
|
||||
|
||||
"detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in within(20 seconds) {
|
||||
runOn(first) {
|
||||
// split the cluster in two parts (first, second) / (third, fourth, fifth)
|
||||
for (role1 ← side1; role2 ← side2) {
|
||||
testConductor.blackhole(role1, role2, Direction.Both).await
|
||||
}
|
||||
}
|
||||
enterBarrier("after-split")
|
||||
|
||||
runOn(first) {
|
||||
awaitAssert(clusterView.unreachableMembers.map(_.address) should be(Set(address(third), address(fourth))))
|
||||
}
|
||||
|
||||
runOn(third, fourth) {
|
||||
awaitAssert(clusterView.unreachableMembers.map(_.address) should be(Set(address(first))))
|
||||
}
|
||||
|
||||
enterBarrier("after-2")
|
||||
}
|
||||
|
||||
"accept joining on each side and set status to WeaklyUp" taggedAs LongRunningTest in within(20 seconds) {
|
||||
runOn(second) {
|
||||
Cluster(system).join(first)
|
||||
}
|
||||
runOn(fifth) {
|
||||
Cluster(system).join(fourth)
|
||||
}
|
||||
enterBarrier("joined")
|
||||
|
||||
runOn(side1: _*) {
|
||||
awaitAssert {
|
||||
clusterView.members.size should be(4)
|
||||
clusterView.members.exists { m ⇒ m.address == address(second) && m.status == WeaklyUp } should be(true)
|
||||
}
|
||||
}
|
||||
|
||||
runOn(side2: _*) {
|
||||
awaitAssert {
|
||||
clusterView.members.size should be(4)
|
||||
clusterView.members.exists { m ⇒ m.address == address(fifth) && m.status == WeaklyUp } should be(true)
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("after-3")
|
||||
}
|
||||
|
||||
"change status to Up after healed network partition" taggedAs LongRunningTest in within(20 seconds) {
|
||||
runOn(first) {
|
||||
for (role1 ← side1; role2 ← side2) {
|
||||
testConductor.passThrough(role1, role2, Direction.Both).await
|
||||
}
|
||||
}
|
||||
enterBarrier("after-passThrough")
|
||||
|
||||
awaitAllReachable()
|
||||
awaitMembersUp(5)
|
||||
|
||||
enterBarrier("after-4")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -26,6 +26,17 @@ object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig {
|
|||
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
object MinMembersBeforeUpWithWeaklyUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.min-nr-of-members = 3
|
||||
akka.cluster.allow-weakly-up-members = on""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
object MinMembersOfRoleBeforeUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
|
@ -46,6 +57,10 @@ class MinMembersBeforeUpMultiJvmNode1 extends MinMembersBeforeUpSpec
|
|||
class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec
|
||||
class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec
|
||||
|
||||
class MinMembersBeforeUpWithWeaklyUpMultiJvmNode1 extends MinMembersBeforeUpSpec
|
||||
class MinMembersBeforeUpWithWeaklyUpMultiJvmNode2 extends MinMembersBeforeUpSpec
|
||||
class MinMembersBeforeUpWithWeaklyUpMultiJvmNode3 extends MinMembersBeforeUpSpec
|
||||
|
||||
class MinMembersOfRoleBeforeUpMultiJvmNode1 extends MinMembersOfRoleBeforeUpSpec
|
||||
class MinMembersOfRoleBeforeUpMultiJvmNode2 extends MinMembersOfRoleBeforeUpSpec
|
||||
class MinMembersOfRoleBeforeUpMultiJvmNode3 extends MinMembersOfRoleBeforeUpSpec
|
||||
|
|
@ -63,6 +78,19 @@ abstract class MinMembersBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersB
|
|||
}
|
||||
}
|
||||
|
||||
abstract class MinMembersBeforeUpWithWeaklyUpSpec extends MinMembersBeforeUpBase(MinMembersBeforeUpMultiJvmSpec) {
|
||||
|
||||
override def first: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.first
|
||||
override def second: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.second
|
||||
override def third: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.third
|
||||
|
||||
"Cluster leader" must {
|
||||
"wait with moving members to UP until minimum number of members have joined with weakly up enabled" taggedAs LongRunningTest in {
|
||||
testWaitMovingMembersToUp()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract class MinMembersOfRoleBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersOfRoleBeforeUpMultiJvmSpec) {
|
||||
|
||||
override def first: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.first
|
||||
|
|
|
|||
|
|
@ -273,12 +273,26 @@ leader, also *auto-down* a node after a configured time of unreachability..
|
|||
follows from the fact that the ``unreachable`` node will likely see the rest of
|
||||
the cluster as ``unreachable``, become its own leader and form its own cluster.
|
||||
|
||||
As mentioned before, if a node is ``unreachable`` then gossip convergence is not
|
||||
possible and therefore any ``leader`` actions are also not possible. By enabling
|
||||
``akka.cluster.allow-weakly-up-members`` it is possible to let new joining nodes be
|
||||
promoted while convergence is not yet reached. These ``Joining`` nodes will be
|
||||
promoted as ``WeaklyUp``. Once gossip convergence is reached, the leader will move
|
||||
``WeaklyUp`` members to ``Up``.
|
||||
|
||||
State Diagram for the Member States
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Note that members on the other side of a network partition have no knowledge about
|
||||
the existence of the new members. You should for example not count ``WeaklyUp``
|
||||
members in quorum decisions.
|
||||
|
||||
State Diagram for the Member States (``akka.cluster.allow-weakly-up-members=off``)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
.. image:: ../images/member-states.png
|
||||
|
||||
State Diagram for the Member States (``akka.cluster.allow-weakly-up-members=on``)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
.. image:: ../images/member-states-weakly-up.png
|
||||
|
||||
Member States
|
||||
^^^^^^^^^^^^^
|
||||
|
|
@ -286,6 +300,9 @@ Member States
|
|||
- **joining**
|
||||
transient state when joining a cluster
|
||||
|
||||
- **weakly up**
|
||||
transient state while network split (only if ``akka.cluster.allow-weakly-up-members=on``)
|
||||
|
||||
- **up**
|
||||
normal operating state
|
||||
|
||||
|
|
|
|||
BIN
akka-docs/rst/images/member-states-weakly-up.png
Normal file
BIN
akka-docs/rst/images/member-states-weakly-up.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 43 KiB |
|
|
@ -32,6 +32,9 @@ and add the following configuration stanza to your ``application.conf``
|
|||
|
||||
Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``,
|
||||
since it is still enabled in akka-cluster by default (for compatibility with past releases).
|
||||
|
||||
Cluster members with status :ref:`WeaklyUp <weakly_up_java>`, if that feature is enabled,
|
||||
will participate in Cluster Metrics collection and dissemination.
|
||||
|
||||
Metrics Collector
|
||||
-----------------
|
||||
|
|
|
|||
|
|
@ -22,6 +22,9 @@ the sender to know the location of the destination actor. This is achieved by se
|
|||
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
|
||||
to route the message with the entity id to the final destination.
|
||||
|
||||
Cluster sharding will not be active on members with status :ref:`WeaklyUp <weakly_up_java>`
|
||||
if that feature is enabled.
|
||||
|
||||
An Example
|
||||
----------
|
||||
|
||||
|
|
|
|||
|
|
@ -53,6 +53,9 @@ It's worth noting that messages can always be lost because of the distributed na
|
|||
As always, additional logic should be implemented in the singleton (acknowledgement) and in the
|
||||
client (retry) actors to ensure at-least-once message delivery.
|
||||
|
||||
The singleton instance will not run on members with status :ref:`WeaklyUp <weakly_up_java>` if that feature
|
||||
is enabled.
|
||||
|
||||
Potential problems to be aware of
|
||||
---------------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -169,6 +169,41 @@ leaving member will be shutdown after the leader has changed status of the membe
|
|||
automatically, but in case of network failures during this process it might still be necessary
|
||||
to set the node’s status to ``Down`` in order to complete the removal.
|
||||
|
||||
.. _weakly_up_java:
|
||||
|
||||
WeaklyUp Members
|
||||
^^^^^^^^^^^^^^^^
|
||||
|
||||
If a node is ``unreachable`` then gossip convergence is not possible and therefore any
|
||||
``leader`` actions are also not possible. However, we still might want new nodes to join
|
||||
the cluster in this scenario.
|
||||
|
||||
.. warning::
|
||||
|
||||
The WeaklyUp feature is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to
|
||||
improve this feature based on our users’ feedback, which implies that while we try to keep incompatible
|
||||
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply this feature.
|
||||
|
||||
This feature is disabled by default. With a configuration option you can allow this behavior::
|
||||
|
||||
akka.cluster.allow-weakly-up-members = on
|
||||
|
||||
When ``allow-weakly-up-members`` is enabled and there is no gossip convergence,
|
||||
``Joining`` members will be promoted to ``WeaklyUp`` and they will become part of the
|
||||
cluster. Once gossip convergence is reached, the leader will move ``WeaklyUp``
|
||||
members to ``Up``.
|
||||
|
||||
You can subscribe to the ``WeaklyUp`` membership event to make use of the members that are
|
||||
in this state, but you should be aware of that members on the other side of a network partition
|
||||
have no knowledge about the existence of the new members. You should for example not count
|
||||
``WeaklyUp`` members in quorum decisions.
|
||||
|
||||
.. warning::
|
||||
|
||||
This feature is only available from Akka 2.4.0 and cannot be used if some of your
|
||||
cluster members are running an older version of Akka.
|
||||
|
||||
|
||||
.. _cluster_subscriber_java:
|
||||
|
||||
Subscribe to Cluster Events
|
||||
|
|
@ -437,6 +472,9 @@ automatically unregistered from the router. When new nodes join the cluster addi
|
|||
routees are added to the router, according to the configuration. Routees are also added
|
||||
when a node becomes reachable again, after having been unreachable.
|
||||
|
||||
Cluster aware routers make use of members with status :ref:`WeaklyUp <weakly_up_java>` if that feature
|
||||
is enabled.
|
||||
|
||||
There are two distinct types of routers.
|
||||
|
||||
* **Group - router that sends messages to the specified path using actor selection**
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ out-of-date value.
|
|||
improve this API based on our users’ feedback, which implies that while we try to keep incompatible
|
||||
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
|
||||
contents of the ``akka.persistence`` package.
|
||||
|
||||
|
||||
Using the Replicator
|
||||
====================
|
||||
|
||||
|
|
@ -40,6 +40,10 @@ with a specific role. It communicates with other ``Replicator`` instances with t
|
|||
(without address) that are running on other nodes . For convenience it can be used with the
|
||||
``akka.cluster.ddata.DistributedData`` extension.
|
||||
|
||||
Cluster members with status :ref:`WeaklyUp <weakly_up_java>`, if that feature is enabled,
|
||||
will currently not participate in Distributed Data, but that is something that should be possible to
|
||||
add in a future release.
|
||||
|
||||
Below is an example of an actor that schedules tick messages to itself and for each tick
|
||||
adds or removes elements from a ``ORSet`` (observed-remove set). It also subscribes to
|
||||
changes of this.
|
||||
|
|
|
|||
|
|
@ -79,6 +79,11 @@ Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with
|
|||
``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck``
|
||||
replies.
|
||||
|
||||
Cluster members with status :ref:`WeaklyUp <weakly_up_java>`, if that feature is enabled,
|
||||
will participate in Distributed Publish Subscribe, i.e. subscribers on nodes with
|
||||
``WeaklyUp`` status will receive published messages if the publisher and subscriber are on
|
||||
same side of a network partition.
|
||||
|
||||
A Small Example
|
||||
---------------
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,9 @@ and add the following configuration stanza to your ``application.conf``
|
|||
Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``,
|
||||
since it is still enabled in akka-cluster by default (for compatibility with past releases).
|
||||
|
||||
Cluster members with status :ref:`WeaklyUp <weakly_up_scala>`, if that feature is enabled,
|
||||
will participate in Cluster Metrics collection and dissemination.
|
||||
|
||||
Metrics Collector
|
||||
-----------------
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,9 @@ the sender to know the location of the destination actor. This is achieved by se
|
|||
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
|
||||
to route the message with the entity id to the final destination.
|
||||
|
||||
Cluster sharding will not be active on members with status :ref:`WeaklyUp <weakly_up_scala>`
|
||||
if that feature is enabled.
|
||||
|
||||
An Example
|
||||
----------
|
||||
|
||||
|
|
|
|||
|
|
@ -53,6 +53,9 @@ It's worth noting that messages can always be lost because of the distributed na
|
|||
As always, additional logic should be implemented in the singleton (acknowledgement) and in the
|
||||
client (retry) actors to ensure at-least-once message delivery.
|
||||
|
||||
The singleton instance will not run on members with status :ref:`WeaklyUp <weakly_up_scala>` if that feature
|
||||
is enabled.
|
||||
|
||||
Potential problems to be aware of
|
||||
---------------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -163,6 +163,41 @@ leaving member will be shutdown after the leader has changed status of the membe
|
|||
automatically, but in case of network failures during this process it might still be necessary
|
||||
to set the node’s status to ``Down`` in order to complete the removal.
|
||||
|
||||
.. _weakly_up_scala:
|
||||
|
||||
WeaklyUp Members
|
||||
^^^^^^^^^^^^^^^^
|
||||
|
||||
If a node is ``unreachable`` then gossip convergence is not possible and therefore any
|
||||
``leader`` actions are also not possible. However, we still might want new nodes to join
|
||||
the cluster in this scenario.
|
||||
|
||||
.. warning::
|
||||
|
||||
The WeaklyUp feature is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to
|
||||
improve this feature based on our users’ feedback, which implies that while we try to keep incompatible
|
||||
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply this feature.
|
||||
|
||||
This feature is disabled by default. With a configuration option you can allow this behavior::
|
||||
|
||||
akka.cluster.allow-weakly-up-members = on
|
||||
|
||||
When ``allow-weakly-up-members`` is enabled and there is no gossip convergence,
|
||||
``Joining`` members will be promoted to ``WeaklyUp`` and they will become part of the
|
||||
cluster. Once gossip convergence is reached, the leader will move ``WeaklyUp``
|
||||
members to ``Up``.
|
||||
|
||||
You can subscribe to the ``WeaklyUp`` membership event to make use of the members that are
|
||||
in this state, but you should be aware of that members on the other side of a network partition
|
||||
have no knowledge about the existence of the new members. You should for example not count
|
||||
``WeaklyUp`` members in quorum decisions.
|
||||
|
||||
.. warning::
|
||||
|
||||
This feature is only available from Akka 2.4.0 and cannot be used if some of your
|
||||
cluster members are running an older version of Akka.
|
||||
|
||||
|
||||
.. _cluster_subscriber_scala:
|
||||
|
||||
Subscribe to Cluster Events
|
||||
|
|
@ -434,6 +469,9 @@ automatically unregistered from the router. When new nodes join the cluster, add
|
|||
routees are added to the router, according to the configuration. Routees are also added
|
||||
when a node becomes reachable again, after having been unreachable.
|
||||
|
||||
Cluster aware routers make use of members with status :ref:`WeaklyUp <weakly_up_scala>` if that feature
|
||||
is enabled.
|
||||
|
||||
There are two distinct types of routers.
|
||||
|
||||
* **Group - router that sends messages to the specified path using actor selection**
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ out-of-date value.
|
|||
improve this API based on our users’ feedback, which implies that while we try to keep incompatible
|
||||
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
|
||||
contents of the ``akka.persistence`` package.
|
||||
|
||||
|
||||
Using the Replicator
|
||||
====================
|
||||
|
||||
|
|
@ -40,6 +40,10 @@ with a specific role. It communicates with other ``Replicator`` instances with t
|
|||
(without address) that are running on other nodes . For convenience it can be used with the
|
||||
``akka.cluster.ddata.DistributedData`` extension.
|
||||
|
||||
Cluster members with status :ref:`WeaklyUp <weakly_up_scala>`, if that feature is enabled,
|
||||
will currently not participate in Distributed Data, but that is something that should be possible to
|
||||
add in a future release.
|
||||
|
||||
Below is an example of an actor that schedules tick messages to itself and for each tick
|
||||
adds or removes elements from a ``ORSet`` (observed-remove set). It also subscribes to
|
||||
changes of this.
|
||||
|
|
|
|||
|
|
@ -79,6 +79,11 @@ Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with
|
|||
``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck``
|
||||
replies.
|
||||
|
||||
Cluster members with status :ref:`WeaklyUp <weakly_up_scala>`, if that feature is enabled,
|
||||
will participate in Distributed Publish Subscribe, i.e. subscribers on nodes with
|
||||
``WeaklyUp`` status will receive published messages if the publisher and subscriber are on
|
||||
same side of a network partition.
|
||||
|
||||
A Small Example
|
||||
---------------
|
||||
|
||||
|
|
|
|||
|
|
@ -566,7 +566,10 @@ object MiMa extends AutoPlugin {
|
|||
FilterAnyProblemStartingWith("akka.remote.serialization.DaemonMsgCreateSerializer"),
|
||||
FilterAnyProblemStartingWith("akka.remote.testconductor.TestConductorProtocol"),
|
||||
FilterAnyProblemStartingWith("akka.cluster.protobuf.msg.ClusterMessages"),
|
||||
FilterAnyProblemStartingWith("akka.cluster.protobuf.ClusterMessageSerializer")
|
||||
FilterAnyProblemStartingWith("akka.cluster.protobuf.ClusterMessageSerializer"),
|
||||
|
||||
// #13584 change in internal actor
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterCoreDaemon.akka$cluster$ClusterCoreDaemon$$isJoiningToUp$1")
|
||||
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue