diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala index f8eaa97f82..a2fc73e04c 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala @@ -122,6 +122,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { // TODO collapse to ClusterEvent._ after akka-cluster metrics is gone import ClusterEvent.MemberEvent import ClusterEvent.MemberUp + import ClusterEvent.MemberWeaklyUp import ClusterEvent.MemberRemoved import ClusterEvent.MemberExited import ClusterEvent.ReachabilityEvent @@ -174,11 +175,14 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg) case state: CurrentClusterState ⇒ receiveState(state) case MemberUp(m) ⇒ addMember(m) + case MemberWeaklyUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) case MemberExited(m) ⇒ removeMember(m) case UnreachableMember(m) ⇒ removeMember(m) - case ReachableMember(m) ⇒ if (m.status == MemberStatus.Up) addMember(m) - case _: MemberEvent ⇒ // not interested in other types of MemberEvent + case ReachableMember(m) ⇒ + if (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) + addMember(m) + case _: MemberEvent ⇒ // not interested in other types of MemberEvent } @@ -207,7 +211,9 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { * Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus]] `Up`. */ def receiveState(state: CurrentClusterState): Unit = - nodes = (state.members -- state.unreachable) collect { case m if m.status == MemberStatus.Up ⇒ m.address } + nodes = (state.members -- state.unreachable) collect { + case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.address + } /** * Samples the latest metrics for the node, updates metrics statistics in diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala index 1448a13aa8..d2604e4102 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala @@ -595,6 +595,10 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Act if (matchingRole(m)) nodes += m.address + case MemberWeaklyUp(m) ⇒ + if (matchingRole(m)) + nodes += m.address + case MemberRemoved(m, _) ⇒ if (m.address == selfAddress) context stop self diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java index fbc9320e46..9793fa2616 100644 --- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java +++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java @@ -138,6 +138,10 @@ public final class ClusterMessages { * Removed = 5; */ Removed(5, 5), + /** + * WeaklyUp = 6; + */ + WeaklyUp(6, 6), ; /** @@ -164,6 +168,10 @@ public final class ClusterMessages { * Removed = 5; */ public static final int Removed_VALUE = 5; + /** + * WeaklyUp = 6; + */ + public static final int WeaklyUp_VALUE = 6; public final int getNumber() { return value; } @@ -176,6 +184,7 @@ public final class ClusterMessages { case 3: return Exiting; case 4: return Down; case 5: return Removed; + case 6: return WeaklyUp; default: return null; } } @@ -16783,10 +16792,11 @@ public final class ClusterMessages { "ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAdd" + "ress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002" + " \002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020" + - "\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*T\n\014Me" + + "\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014Me" + "mberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavi" + "ng\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005" + - "B\035\n\031akka.cluster.protobuf.msgH\001" + "\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluster.protobuf.", + "msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 467b8ce914..8d84285caf 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -157,6 +157,7 @@ enum MemberStatus { Exiting = 3; Down = 4; Removed = 5; + WeaklyUp = 6; } /** diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b6f4969801..5fb3ff3105 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -39,7 +39,15 @@ akka { # handling network partitions. # Disable with "off" or specify a duration to enable. down-removal-margin = off - + + # By default, the leader will not move 'Joining' members to 'Up' during a network + # split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp' + # so they become part of the cluster even during a network split. The leader will + # move 'WeaklyUp' members to 'Up' status once convergence has been reached. This + # feature must be off if some members are running Akka 2.3.X. + # WeaklyUp is an EXPERIMENTAL feature. + allow-weakly-up-members = off + # The roles of this member. List of strings, e.g. roles = ["A", "B"]. # The roles are part of the membership information and can be used by # routers or other services to distribute work to certain member types, diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 01a3cc2bea..7a09543853 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -829,6 +829,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with leaderActionCounter = 0 leaderActionsOnConvergence() } else { + if (cluster.settings.AllowWeaklyUpMembers) + moveJoiningToWeaklyUp() + leaderActionCounter += 1 if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0) logInfo("Leader can currently not perform its duties, reachability status: [{}], member status: [{}]", @@ -859,6 +862,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } + def isMinNrOfMembersFulfilled: Boolean = { + latestGossip.members.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall { + case (role, threshold) ⇒ latestGossip.members.count(_.hasRole(role)) >= threshold + } + } + /** * Leader actions are as follows: * 1. Move JOINING => UP -- When a node joins the cluster @@ -878,12 +887,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localOverview = localGossip.overview val localSeen = localOverview.seen - def enoughMembers: Boolean = { - localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall { - case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold - } - } - def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers + val enoughMembers: Boolean = isMinNrOfMembersFulfilled + def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers val removedUnreachable = for { node ← localOverview.reachability.allUnreachableOrTerminated @@ -967,6 +972,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } + def moveJoiningToWeaklyUp(): Unit = { + val localGossip = latestGossip + val localMembers = localGossip.members + + val enoughMembers: Boolean = isMinNrOfMembersFulfilled + def isJoiningToWeaklyUp(m: Member): Boolean = + m.status == Joining && enoughMembers && latestGossip.reachabilityExcludingDownedObservers.isReachable(m.uniqueAddress) + val changedMembers = localMembers.collect { + case m if isJoiningToWeaklyUp(m) ⇒ m.copy(status = WeaklyUp) + } + + if (changedMembers.nonEmpty) { + // replace changed members + val newMembers = changedMembers ++ localMembers + val newGossip = localGossip.copy(members = newMembers) + updateLatestGossip(newGossip) + + // log status changes + changedMembers foreach { m ⇒ + logInfo("Leader is moving node [{}] to [{}]", m.address, m.status) + } + + publish(latestGossip) + } + + } + /** * Reaps the unreachable members according to the failure detector's verdict. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 8332ae8b86..be95f74148 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -121,6 +121,16 @@ object ClusterEvent { def member: Member } + /** + * Member status changed to WeaklyUp. + * A joining member can be moved to `WeaklyUp` if convergence + * cannot be reached, i.e. there are unreachable nodes. + * It will be moved to `Up` when convergence is reached. + */ + final case class MemberWeaklyUp(member: Member) extends MemberEvent { + if (member.status != WeaklyUp) throw new IllegalArgumentException("Expected WeaklyUp status, got: " + member) + } + /** * Member status changed to Up. */ @@ -268,8 +278,9 @@ object ClusterEvent { case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember } val memberEvents = (newMembers ++ changedMembers) collect { - case m if m.status == Up ⇒ MemberUp(m) - case m if m.status == Exiting ⇒ MemberExited(m) + case m if m.status == WeaklyUp ⇒ MemberWeaklyUp(m) + case m if m.status == Up ⇒ MemberUp(m) + case m if m.status == Exiting ⇒ MemberExited(m) // no events for other transitions } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index a5ac2e2022..a6e2f41d80 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -111,6 +111,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg case HeartbeatTick ⇒ heartbeat() case HeartbeatRsp(from) ⇒ heartbeatRsp(from) case MemberUp(m) ⇒ addMember(m) + case MemberWeaklyUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) case UnreachableMember(m) ⇒ unreachableMember(m) case ReachableMember(m) ⇒ reachableMember(m) @@ -120,7 +121,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def init(snapshot: CurrentClusterState): Unit = { val nodes: Set[UniqueAddress] = snapshot.members.collect { - case m if m.status == MemberStatus.Up ⇒ m.uniqueAddress + case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.uniqueAddress }(collection.breakOut) val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress) state = state.init(nodes, unreachable) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index bcf35858b1..bf68babb9a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -4,7 +4,7 @@ package akka.cluster -// TODO remove metrics +// TODO remove metrics import java.io.Closeable import java.lang.System.{ currentTimeMillis ⇒ newTimestamp } @@ -24,6 +24,7 @@ import akka.actor.Address import akka.actor.DynamicAccess import akka.actor.ExtendedActorSystem import akka.cluster.MemberStatus.Up +import akka.cluster.MemberStatus.WeaklyUp import akka.event.Logging import java.lang.management.MemoryUsage @@ -89,11 +90,14 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg) case state: CurrentClusterState ⇒ receiveState(state) case MemberUp(m) ⇒ addMember(m) + case MemberWeaklyUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) case MemberExited(m) ⇒ removeMember(m) case UnreachableMember(m) ⇒ removeMember(m) - case ReachableMember(m) ⇒ if (m.status == Up) addMember(m) - case _: MemberEvent ⇒ // not interested in other types of MemberEvent + case ReachableMember(m) ⇒ + if (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) + addMember(m) + case _: MemberEvent ⇒ // not interested in other types of MemberEvent } @@ -122,7 +126,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto * Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus]] `Up`. */ def receiveState(state: CurrentClusterState): Unit = - nodes = state.members collect { case m if m.status == Up ⇒ m.address } + nodes = state.members collect { case m if m.status == Up || m.status == WeaklyUp ⇒ m.address } /** * Samples the latest metrics for the node, updates metrics statistics in diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 8119bde413..17ed4088c9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -9,6 +9,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved +import akka.cluster.ClusterEvent.MemberWeaklyUp import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher @@ -72,23 +73,28 @@ private[cluster] class ClusterRemoteWatcher( clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility unreachable --= clusterNodes - case MemberUp(m) ⇒ - if (m.address != selfAddress) { - clusterNodes += m.address - takeOverResponsibility(m.address) - unreachable -= m.address - } - case MemberRemoved(m, previousStatus) ⇒ - if (m.address != selfAddress) { - clusterNodes -= m.address - if (previousStatus == MemberStatus.Down) { - quarantine(m.address, Some(m.uniqueAddress.uid)) - } - publishAddressTerminated(m.address) - } - case _: MemberEvent ⇒ // not interesting + case MemberUp(m) ⇒ memberUp(m) + case MemberWeaklyUp(m) ⇒ memberUp(m) + case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus) + case _: MemberEvent ⇒ // not interesting } + def memberUp(m: Member): Unit = + if (m.address != selfAddress) { + clusterNodes += m.address + takeOverResponsibility(m.address) + unreachable -= m.address + } + + def memberRemoved(m: Member, previousStatus: MemberStatus): Unit = + if (m.address != selfAddress) { + clusterNodes -= m.address + if (previousStatus == MemberStatus.Down) { + quarantine(m.address, Some(m.uniqueAddress.uid)) + } + publishAddressTerminated(m.address) + } + override def watchNode(watchee: InternalActorRef) = if (!clusterNodes(watchee.path.address)) super.watchNode(watchee) @@ -103,4 +109,4 @@ private[cluster] class ClusterRemoteWatcher( unwatchNode(address) } -} \ No newline at end of file +} diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index cb66e13fe1..df783c5b2b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -75,6 +75,8 @@ final class ClusterSettings(val config: Config, val systemName: String) { } } + val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members") + val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet val MinNrOfMembers: Int = { cc.getInt("min-nr-of-members") diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 1b33eeaa54..8dc08dde6f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -108,6 +108,8 @@ object Member { case (_, Exiting) ⇒ true case (Joining, _) ⇒ false case (_, Joining) ⇒ true + case (WeaklyUp, _) ⇒ false + case (_, WeaklyUp) ⇒ true case _ ⇒ ordering.compare(a, b) <= 0 } } @@ -140,17 +142,19 @@ object Member { * Picks the Member with the highest "priority" MemberStatus. */ def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match { - case (Removed, _) ⇒ m1 - case (_, Removed) ⇒ m2 - case (Down, _) ⇒ m1 - case (_, Down) ⇒ m2 - case (Exiting, _) ⇒ m1 - case (_, Exiting) ⇒ m2 - case (Leaving, _) ⇒ m1 - case (_, Leaving) ⇒ m2 - case (Joining, _) ⇒ m2 - case (_, Joining) ⇒ m1 - case (Up, Up) ⇒ m1 + case (Removed, _) ⇒ m1 + case (_, Removed) ⇒ m2 + case (Down, _) ⇒ m1 + case (_, Down) ⇒ m2 + case (Exiting, _) ⇒ m1 + case (_, Exiting) ⇒ m2 + case (Leaving, _) ⇒ m1 + case (_, Leaving) ⇒ m2 + case (Joining, _) ⇒ m2 + case (_, Joining) ⇒ m1 + case (WeaklyUp, _) ⇒ m2 + case (_, WeaklyUp) ⇒ m1 + case (Up, Up) ⇒ m1 } } @@ -164,6 +168,11 @@ abstract class MemberStatus object MemberStatus { @SerialVersionUID(1L) case object Joining extends MemberStatus + /** + * WeaklyUp is an EXPERIMENTAL feature and is subject to change until + * it has received more real world testing. + */ + @SerialVersionUID(1L) case object WeaklyUp extends MemberStatus @SerialVersionUID(1L) case object Up extends MemberStatus @SerialVersionUID(1L) case object Leaving extends MemberStatus @SerialVersionUID(1L) case object Exiting extends MemberStatus @@ -175,6 +184,13 @@ object MemberStatus { */ def joining: MemberStatus = Joining + /** + * Java API: retrieve the “weaklyUp” status singleton. + * WeaklyUp is an EXPERIMENTAL feature and is subject to change until + * it has received more real world testing. + */ + def weaklyUp: MemberStatus = WeaklyUp + /** * Java API: retrieve the “up” status singleton */ @@ -205,7 +221,8 @@ object MemberStatus { */ private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] = Map( - Joining -> Set(Up, Down, Removed), + Joining -> Set(WeaklyUp, Up, Down, Removed), + WeaklyUp -> Set(Up, Down, Removed), Up -> Set(Leaving, Down, Removed), Leaving -> Set(Exiting, Down, Removed), Down -> Set(Removed), diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 030eba0c32..e0e7e745bb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -168,7 +168,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri MemberStatus.Leaving -> cm.MemberStatus.Leaving_VALUE, MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE, MemberStatus.Down -> cm.MemberStatus.Down_VALUE, - MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE) + MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE, + MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE) private val memberStatusFromInt = memberStatusToInt.map { case (a, b) ⇒ (b, a) } diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index a0a9d26125..f691325ec9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -382,7 +382,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ } def isAvailable(m: Member): Boolean = - m.status == MemberStatus.Up && + (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp) && satisfiesRole(m.roles) && (settings.allowLocalRoutees || m.address != cluster.selfAddress) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MemberWeaklyUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MemberWeaklyUpSpec.scala new file mode 100644 index 0000000000..d97a240bb3 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MemberWeaklyUpSpec.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.cluster + +import scala.language.postfixOps +import scala.concurrent.duration._ +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import akka.cluster.MemberStatus.WeaklyUp + +object MemberWeaklyUpSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.remote.retry-gate-closed-for = 3 s + akka.cluster.allow-weakly-up-members = on + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) +} + +class MemberWeaklyUpMultiJvmNode1 extends MemberWeaklyUpSpec +class MemberWeaklyUpMultiJvmNode2 extends MemberWeaklyUpSpec +class MemberWeaklyUpMultiJvmNode3 extends MemberWeaklyUpSpec +class MemberWeaklyUpMultiJvmNode4 extends MemberWeaklyUpSpec +class MemberWeaklyUpMultiJvmNode5 extends MemberWeaklyUpSpec + +abstract class MemberWeaklyUpSpec + extends MultiNodeSpec(MemberWeaklyUpSpec) + with MultiNodeClusterSpec { + + import MemberWeaklyUpSpec._ + + muteMarkingAsUnreachable() + + val side1 = Vector(first, second) + val side2 = Vector(third, fourth, fifth) + + "A cluster of 3 members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(first, third, fourth) + + enterBarrier("after-1") + } + + "detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in within(20 seconds) { + runOn(first) { + // split the cluster in two parts (first, second) / (third, fourth, fifth) + for (role1 ← side1; role2 ← side2) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("after-split") + + runOn(first) { + awaitAssert(clusterView.unreachableMembers.map(_.address) should be(Set(address(third), address(fourth)))) + } + + runOn(third, fourth) { + awaitAssert(clusterView.unreachableMembers.map(_.address) should be(Set(address(first)))) + } + + enterBarrier("after-2") + } + + "accept joining on each side and set status to WeaklyUp" taggedAs LongRunningTest in within(20 seconds) { + runOn(second) { + Cluster(system).join(first) + } + runOn(fifth) { + Cluster(system).join(fourth) + } + enterBarrier("joined") + + runOn(side1: _*) { + awaitAssert { + clusterView.members.size should be(4) + clusterView.members.exists { m ⇒ m.address == address(second) && m.status == WeaklyUp } should be(true) + } + } + + runOn(side2: _*) { + awaitAssert { + clusterView.members.size should be(4) + clusterView.members.exists { m ⇒ m.address == address(fifth) && m.status == WeaklyUp } should be(true) + } + } + + enterBarrier("after-3") + } + + "change status to Up after healed network partition" taggedAs LongRunningTest in within(20 seconds) { + runOn(first) { + for (role1 ← side1; role2 ← side2) { + testConductor.passThrough(role1, role2, Direction.Both).await + } + } + enterBarrier("after-passThrough") + + awaitAllReachable() + awaitMembersUp(5) + + enterBarrier("after-4") + } + + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala index e05e44d834..443748e14c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala @@ -26,6 +26,17 @@ object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } +object MinMembersBeforeUpWithWeaklyUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka.cluster.min-nr-of-members = 3 + akka.cluster.allow-weakly-up-members = on""")). + withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) +} + object MinMembersOfRoleBeforeUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") @@ -46,6 +57,10 @@ class MinMembersBeforeUpMultiJvmNode1 extends MinMembersBeforeUpSpec class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec +class MinMembersBeforeUpWithWeaklyUpMultiJvmNode1 extends MinMembersBeforeUpSpec +class MinMembersBeforeUpWithWeaklyUpMultiJvmNode2 extends MinMembersBeforeUpSpec +class MinMembersBeforeUpWithWeaklyUpMultiJvmNode3 extends MinMembersBeforeUpSpec + class MinMembersOfRoleBeforeUpMultiJvmNode1 extends MinMembersOfRoleBeforeUpSpec class MinMembersOfRoleBeforeUpMultiJvmNode2 extends MinMembersOfRoleBeforeUpSpec class MinMembersOfRoleBeforeUpMultiJvmNode3 extends MinMembersOfRoleBeforeUpSpec @@ -63,6 +78,19 @@ abstract class MinMembersBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersB } } +abstract class MinMembersBeforeUpWithWeaklyUpSpec extends MinMembersBeforeUpBase(MinMembersBeforeUpMultiJvmSpec) { + + override def first: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.first + override def second: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.second + override def third: RoleName = MinMembersBeforeUpWithWeaklyUpMultiJvmSpec.third + + "Cluster leader" must { + "wait with moving members to UP until minimum number of members have joined with weakly up enabled" taggedAs LongRunningTest in { + testWaitMovingMembersToUp() + } + } +} + abstract class MinMembersOfRoleBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersOfRoleBeforeUpMultiJvmSpec) { override def first: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.first diff --git a/akka-docs/rst/common/cluster.rst b/akka-docs/rst/common/cluster.rst index ff6c3ecbf9..ff1becd6f6 100644 --- a/akka-docs/rst/common/cluster.rst +++ b/akka-docs/rst/common/cluster.rst @@ -273,12 +273,26 @@ leader, also *auto-down* a node after a configured time of unreachability.. follows from the fact that the ``unreachable`` node will likely see the rest of the cluster as ``unreachable``, become its own leader and form its own cluster. +As mentioned before, if a node is ``unreachable`` then gossip convergence is not +possible and therefore any ``leader`` actions are also not possible. By enabling +``akka.cluster.allow-weakly-up-members`` it is possible to let new joining nodes be +promoted while convergence is not yet reached. These ``Joining`` nodes will be +promoted as ``WeaklyUp``. Once gossip convergence is reached, the leader will move +``WeaklyUp`` members to ``Up``. -State Diagram for the Member States -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Note that members on the other side of a network partition have no knowledge about +the existence of the new members. You should for example not count ``WeaklyUp`` +members in quorum decisions. + +State Diagram for the Member States (``akka.cluster.allow-weakly-up-members=off``) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. image:: ../images/member-states.png +State Diagram for the Member States (``akka.cluster.allow-weakly-up-members=on``) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. image:: ../images/member-states-weakly-up.png Member States ^^^^^^^^^^^^^ @@ -286,6 +300,9 @@ Member States - **joining** transient state when joining a cluster +- **weakly up** + transient state while network split (only if ``akka.cluster.allow-weakly-up-members=on``) + - **up** normal operating state diff --git a/akka-docs/rst/images/member-states-weakly-up.png b/akka-docs/rst/images/member-states-weakly-up.png new file mode 100644 index 0000000000..8993e9acc1 Binary files /dev/null and b/akka-docs/rst/images/member-states-weakly-up.png differ diff --git a/akka-docs/rst/java/cluster-metrics.rst b/akka-docs/rst/java/cluster-metrics.rst index 33d90ade46..5c6ad4a75c 100644 --- a/akka-docs/rst/java/cluster-metrics.rst +++ b/akka-docs/rst/java/cluster-metrics.rst @@ -32,6 +32,9 @@ and add the following configuration stanza to your ``application.conf`` Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``, since it is still enabled in akka-cluster by default (for compatibility with past releases). + +Cluster members with status :ref:`WeaklyUp `, if that feature is enabled, +will participate in Cluster Metrics collection and dissemination. Metrics Collector ----------------- diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index b5caa18af8..ad200770dd 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -22,6 +22,9 @@ the sender to know the location of the destination actor. This is achieved by se the messages via a ``ShardRegion`` actor provided by this extension, which knows how to route the message with the entity id to the final destination. +Cluster sharding will not be active on members with status :ref:`WeaklyUp ` +if that feature is enabled. + An Example ---------- diff --git a/akka-docs/rst/java/cluster-singleton.rst b/akka-docs/rst/java/cluster-singleton.rst index be37285dfb..b99ce381d6 100644 --- a/akka-docs/rst/java/cluster-singleton.rst +++ b/akka-docs/rst/java/cluster-singleton.rst @@ -53,6 +53,9 @@ It's worth noting that messages can always be lost because of the distributed na As always, additional logic should be implemented in the singleton (acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. +The singleton instance will not run on members with status :ref:`WeaklyUp ` if that feature +is enabled. + Potential problems to be aware of --------------------------------- diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 3ba75351fd..f0cdbe40cb 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -169,6 +169,41 @@ leaving member will be shutdown after the leader has changed status of the membe automatically, but in case of network failures during this process it might still be necessary to set the node’s status to ``Down`` in order to complete the removal. +.. _weakly_up_java: + +WeaklyUp Members +^^^^^^^^^^^^^^^^ + +If a node is ``unreachable`` then gossip convergence is not possible and therefore any +``leader`` actions are also not possible. However, we still might want new nodes to join +the cluster in this scenario. + +.. warning:: + + The WeaklyUp feature is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this feature based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply this feature. + +This feature is disabled by default. With a configuration option you can allow this behavior:: + + akka.cluster.allow-weakly-up-members = on + +When ``allow-weakly-up-members`` is enabled and there is no gossip convergence, +``Joining`` members will be promoted to ``WeaklyUp`` and they will become part of the +cluster. Once gossip convergence is reached, the leader will move ``WeaklyUp`` +members to ``Up``. + +You can subscribe to the ``WeaklyUp`` membership event to make use of the members that are +in this state, but you should be aware of that members on the other side of a network partition +have no knowledge about the existence of the new members. You should for example not count +``WeaklyUp`` members in quorum decisions. + +.. warning:: + + This feature is only available from Akka 2.4.0 and cannot be used if some of your + cluster members are running an older version of Akka. + + .. _cluster_subscriber_java: Subscribe to Cluster Events @@ -437,6 +472,9 @@ automatically unregistered from the router. When new nodes join the cluster addi routees are added to the router, according to the configuration. Routees are also added when a node becomes reachable again, after having been unreachable. +Cluster aware routers make use of members with status :ref:`WeaklyUp ` if that feature +is enabled. + There are two distinct types of routers. * **Group - router that sends messages to the specified path using actor selection** diff --git a/akka-docs/rst/java/distributed-data.rst b/akka-docs/rst/java/distributed-data.rst index e9299ecb43..3bbf9869d5 100644 --- a/akka-docs/rst/java/distributed-data.rst +++ b/akka-docs/rst/java/distributed-data.rst @@ -30,7 +30,7 @@ out-of-date value. improve this API based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the contents of the ``akka.persistence`` package. - + Using the Replicator ==================== @@ -40,6 +40,10 @@ with a specific role. It communicates with other ``Replicator`` instances with t (without address) that are running on other nodes . For convenience it can be used with the ``akka.cluster.ddata.DistributedData`` extension. +Cluster members with status :ref:`WeaklyUp `, if that feature is enabled, +will currently not participate in Distributed Data, but that is something that should be possible to +add in a future release. + Below is an example of an actor that schedules tick messages to itself and for each tick adds or removes elements from a ``ORSet`` (observed-remove set). It also subscribes to changes of this. diff --git a/akka-docs/rst/java/distributed-pub-sub.rst b/akka-docs/rst/java/distributed-pub-sub.rst index f5a05ac721..6142daf8b6 100644 --- a/akka-docs/rst/java/distributed-pub-sub.rst +++ b/akka-docs/rst/java/distributed-pub-sub.rst @@ -79,6 +79,11 @@ Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with ``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck`` replies. +Cluster members with status :ref:`WeaklyUp `, if that feature is enabled, +will participate in Distributed Publish Subscribe, i.e. subscribers on nodes with +``WeaklyUp`` status will receive published messages if the publisher and subscriber are on +same side of a network partition. + A Small Example --------------- diff --git a/akka-docs/rst/scala/cluster-metrics.rst b/akka-docs/rst/scala/cluster-metrics.rst index ff456c75f8..64cea2e988 100644 --- a/akka-docs/rst/scala/cluster-metrics.rst +++ b/akka-docs/rst/scala/cluster-metrics.rst @@ -29,6 +29,9 @@ and add the following configuration stanza to your ``application.conf`` Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``, since it is still enabled in akka-cluster by default (for compatibility with past releases). +Cluster members with status :ref:`WeaklyUp `, if that feature is enabled, +will participate in Cluster Metrics collection and dissemination. + Metrics Collector ----------------- diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 1f0f0b8957..4dff2f462a 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -22,6 +22,9 @@ the sender to know the location of the destination actor. This is achieved by se the messages via a ``ShardRegion`` actor provided by this extension, which knows how to route the message with the entity id to the final destination. +Cluster sharding will not be active on members with status :ref:`WeaklyUp ` +if that feature is enabled. + An Example ---------- diff --git a/akka-docs/rst/scala/cluster-singleton.rst b/akka-docs/rst/scala/cluster-singleton.rst index 4fae06ed17..c1631f5b5e 100644 --- a/akka-docs/rst/scala/cluster-singleton.rst +++ b/akka-docs/rst/scala/cluster-singleton.rst @@ -53,6 +53,9 @@ It's worth noting that messages can always be lost because of the distributed na As always, additional logic should be implemented in the singleton (acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. +The singleton instance will not run on members with status :ref:`WeaklyUp ` if that feature +is enabled. + Potential problems to be aware of --------------------------------- diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index de48ed689b..344dcde4ef 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -163,6 +163,41 @@ leaving member will be shutdown after the leader has changed status of the membe automatically, but in case of network failures during this process it might still be necessary to set the node’s status to ``Down`` in order to complete the removal. +.. _weakly_up_scala: + +WeaklyUp Members +^^^^^^^^^^^^^^^^ + +If a node is ``unreachable`` then gossip convergence is not possible and therefore any +``leader`` actions are also not possible. However, we still might want new nodes to join +the cluster in this scenario. + +.. warning:: + + The WeaklyUp feature is marked as **“experimental”** as of its introduction in Akka 2.4.0. We will continue to + improve this feature based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply this feature. + +This feature is disabled by default. With a configuration option you can allow this behavior:: + + akka.cluster.allow-weakly-up-members = on + +When ``allow-weakly-up-members`` is enabled and there is no gossip convergence, +``Joining`` members will be promoted to ``WeaklyUp`` and they will become part of the +cluster. Once gossip convergence is reached, the leader will move ``WeaklyUp`` +members to ``Up``. + +You can subscribe to the ``WeaklyUp`` membership event to make use of the members that are +in this state, but you should be aware of that members on the other side of a network partition +have no knowledge about the existence of the new members. You should for example not count +``WeaklyUp`` members in quorum decisions. + +.. warning:: + + This feature is only available from Akka 2.4.0 and cannot be used if some of your + cluster members are running an older version of Akka. + + .. _cluster_subscriber_scala: Subscribe to Cluster Events @@ -434,6 +469,9 @@ automatically unregistered from the router. When new nodes join the cluster, add routees are added to the router, according to the configuration. Routees are also added when a node becomes reachable again, after having been unreachable. +Cluster aware routers make use of members with status :ref:`WeaklyUp ` if that feature +is enabled. + There are two distinct types of routers. * **Group - router that sends messages to the specified path using actor selection** diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index 69d1f181f0..2686aeaad0 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -30,7 +30,7 @@ out-of-date value. improve this API based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the contents of the ``akka.persistence`` package. - + Using the Replicator ==================== @@ -40,6 +40,10 @@ with a specific role. It communicates with other ``Replicator`` instances with t (without address) that are running on other nodes . For convenience it can be used with the ``akka.cluster.ddata.DistributedData`` extension. +Cluster members with status :ref:`WeaklyUp `, if that feature is enabled, +will currently not participate in Distributed Data, but that is something that should be possible to +add in a future release. + Below is an example of an actor that schedules tick messages to itself and for each tick adds or removes elements from a ``ORSet`` (observed-remove set). It also subscribes to changes of this. diff --git a/akka-docs/rst/scala/distributed-pub-sub.rst b/akka-docs/rst/scala/distributed-pub-sub.rst index a7be9dd051..ac655e862d 100644 --- a/akka-docs/rst/scala/distributed-pub-sub.rst +++ b/akka-docs/rst/scala/distributed-pub-sub.rst @@ -79,6 +79,11 @@ Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with ``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck`` replies. +Cluster members with status :ref:`WeaklyUp `, if that feature is enabled, +will participate in Distributed Publish Subscribe, i.e. subscribers on nodes with +``WeaklyUp`` status will receive published messages if the publisher and subscriber are on +same side of a network partition. + A Small Example --------------- diff --git a/project/MiMa.scala b/project/MiMa.scala index cf0304fc43..4e1c820d4c 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -566,7 +566,10 @@ object MiMa extends AutoPlugin { FilterAnyProblemStartingWith("akka.remote.serialization.DaemonMsgCreateSerializer"), FilterAnyProblemStartingWith("akka.remote.testconductor.TestConductorProtocol"), FilterAnyProblemStartingWith("akka.cluster.protobuf.msg.ClusterMessages"), - FilterAnyProblemStartingWith("akka.cluster.protobuf.ClusterMessageSerializer") + FilterAnyProblemStartingWith("akka.cluster.protobuf.ClusterMessageSerializer"), + + // #13584 change in internal actor + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterCoreDaemon.akka$cluster$ClusterCoreDaemon$$isJoiningToUp$1") ) }