Merge branch 'master' of github.com:akka/akka
This commit is contained in:
commit
45d3c7dccd
12 changed files with 574 additions and 299 deletions
|
|
@ -9,7 +9,6 @@ package akka
|
|||
* <ul>
|
||||
* <li>a uuid for tracking purposes</li>
|
||||
* <li>toString that includes exception name, message and uuid</li>
|
||||
* <li>toLongString which also includes the stack trace</li>
|
||||
* </ul>
|
||||
*/
|
||||
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
|
||||
|
|
|
|||
|
|
@ -165,8 +165,7 @@ class AccrualFailureDetector(
|
|||
else PhiFactor * timestampDiff / mean
|
||||
}
|
||||
|
||||
// FIXME change to debug log level, when failure detector is stable
|
||||
log.info("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
|
||||
log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
|
||||
phi
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,27 +6,27 @@ package akka.cluster
|
|||
|
||||
import akka.actor._
|
||||
import akka.actor.Status._
|
||||
import akka.ConfigurationException
|
||||
import akka.dispatch.Await
|
||||
import akka.dispatch.MonitorableThreadFactory
|
||||
import akka.event.Logging
|
||||
import akka.jsr166y.ThreadLocalRandom
|
||||
import akka.pattern.ask
|
||||
import akka.remote._
|
||||
import akka.routing._
|
||||
import akka.event.Logging
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
import akka.util._
|
||||
import akka.util.duration._
|
||||
import akka.ConfigurationException
|
||||
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
|
||||
import java.util.concurrent.TimeUnit._
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.jsr166y.ThreadLocalRandom
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.io.Closeable
|
||||
import javax.management._
|
||||
import scala.collection.immutable.{ Map, SortedSet }
|
||||
import scala.annotation.tailrec
|
||||
import com.google.protobuf.ByteString
|
||||
import akka.util.internal.HashedWheelTimer
|
||||
import akka.dispatch.MonitorableThreadFactory
|
||||
import com.google.protobuf.ByteString
|
||||
import java.io.Closeable
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.TimeUnit._
|
||||
import javax.management._
|
||||
import MemberStatus._
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable.{ Map, SortedSet }
|
||||
|
||||
/**
|
||||
* Interface for membership change listener.
|
||||
|
|
@ -52,7 +52,7 @@ sealed trait ClusterMessage extends Serializable
|
|||
/**
|
||||
* Cluster commands sent by the USER.
|
||||
*/
|
||||
object ClusterAction {
|
||||
object ClusterUserAction {
|
||||
|
||||
/**
|
||||
* Command to join the cluster. Sent when a node (reprsesented by 'address')
|
||||
|
|
@ -69,22 +69,33 @@ object ClusterAction {
|
|||
* Command to mark node as temporary down.
|
||||
*/
|
||||
case class Down(address: Address) extends ClusterMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* Cluster commands sent by the LEADER.
|
||||
*/
|
||||
object ClusterLeaderAction {
|
||||
|
||||
/**
|
||||
* Command to remove a node from the cluster immediately.
|
||||
*/
|
||||
case class Remove(address: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Command to mark a node to be removed from the cluster immediately.
|
||||
* Can only be sent by the leader.
|
||||
*/
|
||||
private[akka] case class Exit(address: Address) extends ClusterMessage
|
||||
private[cluster] case class Exit(address: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Command to remove a node from the cluster immediately.
|
||||
*/
|
||||
private[cluster] case class Remove(address: Address) extends ClusterMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the address and the current status of a cluster member node.
|
||||
*
|
||||
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`.
|
||||
*/
|
||||
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
|
||||
override def hashCode = address.##
|
||||
|
|
@ -94,12 +105,12 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
|
|||
}
|
||||
|
||||
/**
|
||||
* Factory and Utility module for Member instances.
|
||||
* Module with factory and ordering methods for Member instances.
|
||||
*/
|
||||
object Member {
|
||||
|
||||
/**
|
||||
* Sort Address by host and port
|
||||
* `Address` ordering type class, sorts addresses by host and port.
|
||||
*/
|
||||
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒
|
||||
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
|
||||
|
|
@ -107,8 +118,14 @@ object Member {
|
|||
else false
|
||||
}
|
||||
|
||||
implicit val ordering: Ordering[Member] = new Ordering[Member] {
|
||||
def compare(x: Member, y: Member) = addressOrdering.compare(x.address, y.address)
|
||||
/**
|
||||
* `Member` ordering type class, sorts members by host and port with the exception that
|
||||
* it puts all members that are in MemberStatus.EXITING last.
|
||||
*/
|
||||
implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒
|
||||
if (a.status == Exiting && b.status != Exiting) false
|
||||
else if (a.status != Exiting && b.status == Exiting) true
|
||||
else addressOrdering.compare(a.address, b.address) < 0
|
||||
}
|
||||
|
||||
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
|
||||
|
|
@ -157,10 +174,11 @@ case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage
|
|||
* Can be one of: Joining, Up, Leaving, Exiting and Down.
|
||||
*/
|
||||
sealed trait MemberStatus extends ClusterMessage {
|
||||
|
||||
/**
|
||||
* Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED.
|
||||
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
|
||||
*/
|
||||
def isUnavailable: Boolean = this == Down || this == Removed
|
||||
def isUnavailable: Boolean = this == Down
|
||||
}
|
||||
|
||||
object MemberStatus {
|
||||
|
|
@ -226,6 +244,7 @@ case class Gossip(
|
|||
|
||||
// FIXME can be disabled as optimization
|
||||
assertInvariants
|
||||
|
||||
private def assertInvariants: Unit = {
|
||||
val unreachableAndLive = members.intersect(overview.unreachable)
|
||||
if (unreachableAndLive.nonEmpty)
|
||||
|
|
@ -251,14 +270,17 @@ case class Gossip(
|
|||
*/
|
||||
def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node)
|
||||
|
||||
/**
|
||||
* Adds a member to the member node ring.
|
||||
*/
|
||||
def :+(member: Member): Gossip = {
|
||||
if (members contains member) this
|
||||
else this copy (members = members + member)
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen'
|
||||
* Map with the VectorClock for the new gossip.
|
||||
* Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen'
|
||||
* Map with the VectorClock (version) for the new gossip.
|
||||
*/
|
||||
def seen(address: Address): Gossip = {
|
||||
if (overview.seen.contains(address) && overview.seen(address) == version) this
|
||||
|
|
@ -282,8 +304,7 @@ case class Gossip(
|
|||
|
||||
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
||||
// and exclude unreachable
|
||||
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).
|
||||
filterNot(mergedUnreachable.contains)
|
||||
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
|
||||
|
||||
// 5. fresh seen table
|
||||
val mergedSeen = Map.empty[Address, VectorClock]
|
||||
|
|
@ -306,11 +327,14 @@ case class Gossip(
|
|||
case class Heartbeat(from: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Manages routing of the different cluster commands.
|
||||
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
|
||||
*/
|
||||
private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
|
||||
import ClusterAction._
|
||||
private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
|
||||
import ClusterUserAction._
|
||||
import ClusterLeaderAction._
|
||||
|
||||
val log = Logging(context.system, this)
|
||||
|
||||
|
|
@ -326,10 +350,12 @@ private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Pooled and routed with N number of configurable instances.
|
||||
* Concurrent access to Cluster.
|
||||
*/
|
||||
private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
|
||||
private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
|
||||
val log = Logging(context.system, this)
|
||||
|
||||
def receive = {
|
||||
|
|
@ -341,9 +367,11 @@ private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Supervisor managing the different Cluster daemons.
|
||||
*/
|
||||
private[akka] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
|
||||
private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
|
||||
val log = Logging(context.system, this)
|
||||
|
||||
private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)), "commands")
|
||||
|
|
@ -396,13 +424,11 @@ trait ClusterNodeMBean {
|
|||
def isSingleton: Boolean
|
||||
def isConvergence: Boolean
|
||||
def isAvailable: Boolean
|
||||
def isRunning: Boolean
|
||||
|
||||
def join(address: String)
|
||||
def leave(address: String)
|
||||
def down(address: String)
|
||||
def remove(address: String)
|
||||
|
||||
def shutdown()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -455,7 +481,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
|
||||
private val serialization = remote.serialization
|
||||
|
||||
private val isRunning = new AtomicBoolean(true)
|
||||
private val _isRunning = new AtomicBoolean(true)
|
||||
private val log = Logging(system, "Node")
|
||||
|
||||
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
|
||||
|
|
@ -566,15 +592,27 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the cluster node is up and running, false if it is shut down.
|
||||
*/
|
||||
def isRunning: Boolean = _isRunning.get
|
||||
|
||||
/**
|
||||
* Latest gossip.
|
||||
*/
|
||||
def latestGossip: Gossip = state.get.latestGossip
|
||||
|
||||
/**
|
||||
* Member status for this node.
|
||||
* Member status for this node (`MemberStatus`).
|
||||
*
|
||||
* NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
|
||||
* and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
|
||||
* model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
|
||||
*/
|
||||
def status: MemberStatus = self.status
|
||||
def status: MemberStatus = {
|
||||
if (isRunning) self.status
|
||||
else MemberStatus.Removed
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this node the leader?
|
||||
|
|
@ -606,33 +644,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
*/
|
||||
def isAvailable: Boolean = !isUnavailable(state.get)
|
||||
|
||||
/**
|
||||
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
|
||||
*/
|
||||
def shutdown(): Unit = {
|
||||
if (isRunning.compareAndSet(true, false)) {
|
||||
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
|
||||
|
||||
// cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
|
||||
gossipTask.cancel()
|
||||
heartbeatTask.cancel()
|
||||
failureDetectorReaperTask.cancel()
|
||||
leaderActionsTask.cancel()
|
||||
clusterScheduler.close()
|
||||
|
||||
// FIXME isTerminated check can be removed when ticket #2221 is fixed
|
||||
// now it prevents logging if system is shutdown (or in progress of shutdown)
|
||||
if (!clusterDaemons.isTerminated)
|
||||
system.stop(clusterDaemons)
|
||||
|
||||
try {
|
||||
mBeanServer.unregisterMBean(clusterMBeanName)
|
||||
} catch {
|
||||
case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a listener to subscribe to cluster membership changes.
|
||||
*/
|
||||
|
|
@ -661,7 +672,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
*/
|
||||
def join(address: Address): Unit = {
|
||||
val connection = clusterCommandConnectionFor(address)
|
||||
val command = ClusterAction.Join(selfAddress)
|
||||
val command = ClusterUserAction.Join(selfAddress)
|
||||
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
|
||||
connection ! command
|
||||
}
|
||||
|
|
@ -670,21 +681,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
* Send command to issue state transition to LEAVING for the node specified by 'address'.
|
||||
*/
|
||||
def leave(address: Address): Unit = {
|
||||
clusterCommandDaemon ! ClusterAction.Leave(address)
|
||||
clusterCommandDaemon ! ClusterUserAction.Leave(address)
|
||||
}
|
||||
|
||||
/**
|
||||
* Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
|
||||
* Send command to DOWN the node specified by 'address'.
|
||||
*/
|
||||
def down(address: Address): Unit = {
|
||||
clusterCommandDaemon ! ClusterAction.Down(address)
|
||||
}
|
||||
|
||||
/**
|
||||
* Send command to issue state transition to REMOVED for the node specified by 'address'.
|
||||
*/
|
||||
def remove(address: Address): Unit = {
|
||||
clusterCommandDaemon ! ClusterAction.Remove(address)
|
||||
clusterCommandDaemon ! ClusterUserAction.Down(address)
|
||||
}
|
||||
|
||||
// ========================================================
|
||||
|
|
@ -692,22 +696,52 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
// ========================================================
|
||||
|
||||
/**
|
||||
* State transition to JOINING.
|
||||
* New node joining.
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
|
||||
*
|
||||
* Should not called by the user. The user can issue a LEAVE command which will tell the node
|
||||
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
|
||||
*/
|
||||
private[cluster] def shutdown(): Unit = {
|
||||
if (_isRunning.compareAndSet(true, false)) {
|
||||
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
|
||||
|
||||
// cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
|
||||
gossipTask.cancel()
|
||||
heartbeatTask.cancel()
|
||||
failureDetectorReaperTask.cancel()
|
||||
leaderActionsTask.cancel()
|
||||
clusterScheduler.close()
|
||||
|
||||
// FIXME isTerminated check can be removed when ticket #2221 is fixed
|
||||
// now it prevents logging if system is shutdown (or in progress of shutdown)
|
||||
if (!clusterDaemons.isTerminated)
|
||||
system.stop(clusterDaemons)
|
||||
|
||||
try {
|
||||
mBeanServer.unregisterMBean(clusterMBeanName)
|
||||
} catch {
|
||||
case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
|
||||
}
|
||||
log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* State transition to JOINING - new node joining.
|
||||
*/
|
||||
@tailrec
|
||||
private[cluster] final def joining(node: Address): Unit = {
|
||||
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
|
||||
|
||||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
val localMembers = localGossip.members
|
||||
val localUnreachable = localGossip.overview.unreachable
|
||||
|
||||
val alreadyMember = localMembers.exists(_.address == node)
|
||||
val isUnreachable = localUnreachable.exists { m ⇒
|
||||
m.address == node && m.status != Down && m.status != Removed
|
||||
}
|
||||
val isUnreachable = localUnreachable.exists { m ⇒ m.address == node && m.status != Down }
|
||||
|
||||
if (!alreadyMember && !isUnreachable) {
|
||||
|
||||
|
|
@ -725,6 +759,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
|
||||
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
|
||||
else {
|
||||
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
|
||||
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
|
||||
if (node != selfAddress) failureDetector heartbeat node
|
||||
notifyMembershipChangeListeners(localState, newState)
|
||||
|
|
@ -733,52 +768,60 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* State transition to LEAVING.
|
||||
*/
|
||||
@tailrec
|
||||
private[cluster] final def leaving(address: Address) {
|
||||
log.info("Cluster Node [{}] - Marking address [{}] as LEAVING", selfAddress, address)
|
||||
|
||||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
val localMembers = localGossip.members
|
||||
if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
|
||||
val newMembers = localGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
|
||||
val newGossip = localGossip copy (members = newMembers)
|
||||
|
||||
val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING
|
||||
val newGossip = localGossip copy (members = newMembers)
|
||||
val versionedGossip = newGossip :+ vclockNode
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
|
||||
val versionedGossip = newGossip :+ vclockNode
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
val newState = localState copy (latestGossip = seenVersionedGossip)
|
||||
|
||||
val newState = localState copy (latestGossip = seenVersionedGossip)
|
||||
|
||||
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
|
||||
else {
|
||||
notifyMembershipChangeListeners(localState, newState)
|
||||
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
|
||||
else {
|
||||
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
|
||||
notifyMembershipChangeListeners(localState, newState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
|
||||
val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status))
|
||||
val newMembersStatus = newState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status))
|
||||
if (newMembersStatus != oldMembersStatus)
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* State transition to EXITING.
|
||||
*/
|
||||
private[cluster] final def exiting(address: Address): Unit = {
|
||||
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address)
|
||||
log.info("Cluster Node [{}] - Marked node [{}] as EXITING", selfAddress, address)
|
||||
// FIXME implement when we implement hand-off
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* State transition to REMOVED.
|
||||
*
|
||||
* This method is for now only called after the LEADER have sent a Removed message - telling the node
|
||||
* to shut down himself.
|
||||
*
|
||||
* In the future we might change this to allow the USER to send a Removed(address) message telling an
|
||||
* arbitrary node to be moved direcly from UP -> REMOVED.
|
||||
*/
|
||||
private[cluster] final def removing(address: Address): Unit = {
|
||||
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address)
|
||||
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
|
||||
shutdown()
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
|
||||
* and its status is set to DOWN. The node is also removed from the 'seen' table.
|
||||
*
|
||||
|
|
@ -836,6 +879,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Receive new gossip.
|
||||
*/
|
||||
@tailrec
|
||||
|
|
@ -849,9 +894,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
val mergedGossip = remoteGossip merge localGossip
|
||||
val versionedMergedGossip = mergedGossip :+ vclockNode
|
||||
|
||||
// FIXME change to debug log level, when failure detector is stable
|
||||
log.info(
|
||||
"""Can't establish a causal relationship between "remote" gossip [{}] and "local" gossip [{}] - merging them into [{}]""",
|
||||
log.debug(
|
||||
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
|
||||
remoteGossip, localGossip, versionedMergedGossip)
|
||||
|
||||
versionedMergedGossip
|
||||
|
|
@ -876,7 +920,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
|
||||
|
||||
|
|
@ -886,11 +930,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
private def autoJoin(): Unit = nodeToJoin foreach join
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Gossips latest gossip to an address.
|
||||
*/
|
||||
private[akka] def gossipTo(address: Address): Unit = {
|
||||
private[cluster] def gossipTo(address: Address): Unit = {
|
||||
val connection = clusterGossipConnectionFor(address)
|
||||
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
|
||||
connection ! GossipEnvelope(selfAddress, latestGossip)
|
||||
|
|
@ -910,18 +954,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[akka] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
|
||||
private[cluster] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
|
||||
(membersSize + unreachableSize) match {
|
||||
case 0 ⇒ 0.0
|
||||
case sum ⇒ unreachableSize.toDouble / sum
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[akka] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
|
||||
private[cluster] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
|
||||
if (nrOfDeputyNodes > membersSize) 1.0
|
||||
else if (nrOfDeputyNodes == 0) 0.0
|
||||
else (membersSize + unreachableSize) match {
|
||||
|
|
@ -931,11 +975,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Initates a new round of gossip.
|
||||
*/
|
||||
private[akka] def gossip(): Unit = {
|
||||
private[cluster] def gossip(): Unit = {
|
||||
val localState = state.get
|
||||
|
||||
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
|
||||
|
|
@ -972,9 +1016,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[akka] def heartbeat(): Unit = {
|
||||
private[cluster] def heartbeat(): Unit = {
|
||||
val localState = state.get
|
||||
|
||||
if (!isSingletonCluster(localState)) {
|
||||
|
|
@ -989,12 +1033,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
|
||||
*/
|
||||
@tailrec
|
||||
final private[akka] def reapUnreachableMembers(): Unit = {
|
||||
final private[cluster] def reapUnreachableMembers(): Unit = {
|
||||
val localState = state.get
|
||||
|
||||
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
||||
|
|
@ -1033,124 +1077,187 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
|
||||
*/
|
||||
@tailrec
|
||||
final private[akka] def leaderActions(): Unit = {
|
||||
final private[cluster] def leaderActions(): Unit = {
|
||||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
val localMembers = localGossip.members
|
||||
|
||||
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
|
||||
|
||||
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
|
||||
def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = {
|
||||
true
|
||||
}
|
||||
|
||||
if (isLeader && isAvailable(localState)) {
|
||||
// only run the leader actions if we are the LEADER and available
|
||||
|
||||
val localOverview = localGossip.overview
|
||||
val localSeen = localOverview.seen
|
||||
val localUnreachableMembers = localOverview.unreachable
|
||||
val hasPartionHandoffCompletedSuccessfully: Boolean = {
|
||||
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
|
||||
true
|
||||
}
|
||||
|
||||
// Leader actions are as follows:
|
||||
// 1. Move JOINING => UP -- When a node joins the cluster
|
||||
// 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence)
|
||||
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
|
||||
// 2. Move JOINING => UP -- When a node joins the cluster
|
||||
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
|
||||
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
|
||||
// 5. Updating the vclock version for the changes
|
||||
// 6. Updating the 'seen' table
|
||||
// 5. Store away all stuff needed for the side-effecting processing in 10.
|
||||
// 6. Updating the vclock version for the changes
|
||||
// 7. Updating the 'seen' table
|
||||
// 8. Try to update the state with the new gossip
|
||||
// 9. If failure - retry
|
||||
// 10. If success - run all the side-effecting processing
|
||||
|
||||
var hasChangedState = false
|
||||
val newGossip =
|
||||
val (
|
||||
newGossip: Gossip,
|
||||
hasChangedState: Boolean,
|
||||
upMembers,
|
||||
exitingMembers,
|
||||
removedMembers,
|
||||
unreachableButNotDownedMembers) =
|
||||
|
||||
if (convergence(localGossip).isDefined) {
|
||||
// we have convergence - so we can't have unreachable nodes
|
||||
|
||||
// transform the node member ring - filterNot/map/map
|
||||
val newMembers =
|
||||
|
||||
localMembers map { member ⇒
|
||||
localMembers filterNot { member ⇒
|
||||
// ----------------------
|
||||
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
|
||||
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
|
||||
// ----------------------
|
||||
if (member.status == Joining) {
|
||||
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
|
||||
hasChangedState = true
|
||||
member copy (status = Up)
|
||||
} else member
|
||||
member.status == MemberStatus.Exiting
|
||||
|
||||
} map { member ⇒
|
||||
// ----------------------
|
||||
// 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence)
|
||||
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
|
||||
// ----------------------
|
||||
if (member.status == Exiting) {
|
||||
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
|
||||
hasChangedState = true
|
||||
member copy (status = Removed)
|
||||
} else member
|
||||
if (member.status == Joining) member copy (status = Up)
|
||||
else member
|
||||
|
||||
} map { member ⇒
|
||||
// ----------------------
|
||||
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
|
||||
// ----------------------
|
||||
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
|
||||
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
|
||||
hasChangedState = true
|
||||
member copy (status = Exiting)
|
||||
} else member
|
||||
|
||||
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
|
||||
else member
|
||||
}
|
||||
localGossip copy (members = newMembers) // update gossip
|
||||
|
||||
// ----------------------
|
||||
// 5. Store away all stuff needed for the side-effecting processing in 10.
|
||||
// ----------------------
|
||||
|
||||
// Check for the need to do side-effecting on successful state change
|
||||
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
|
||||
// to check for state-changes and to store away removed and exiting members for later notification
|
||||
// 1. check for state-changes to update
|
||||
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
|
||||
val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
|
||||
|
||||
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
|
||||
|
||||
val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
|
||||
|
||||
val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
|
||||
|
||||
// removing REMOVED nodes from the 'seen' table
|
||||
val newSeen = localSeen -- removedMembers.map(_.address)
|
||||
|
||||
// removing REMOVED nodes from the 'unreachable' set
|
||||
val newUnreachableMembers = localUnreachableMembers -- removedMembers
|
||||
|
||||
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
|
||||
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
|
||||
|
||||
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member])
|
||||
|
||||
} else if (AutoDown) {
|
||||
// we don't have convergence - so we might have unreachable nodes
|
||||
|
||||
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
|
||||
|
||||
// ----------------------
|
||||
// 4. Move UNREACHABLE => DOWN (auto-downing by leader)
|
||||
// ----------------------
|
||||
val newUnreachableMembers =
|
||||
localUnreachableMembers.map { member ⇒
|
||||
// no need to DOWN members already DOWN
|
||||
if (member.status == Down) member
|
||||
else {
|
||||
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
|
||||
hasChangedState = true
|
||||
member copy (status = Down)
|
||||
}
|
||||
}
|
||||
|
||||
// removing nodes marked as DOWN from the 'seen' table
|
||||
val newSeen = localSeen -- newUnreachableMembers.collect {
|
||||
case m if m.status == Down ⇒ m.address
|
||||
val newUnreachableMembers = localUnreachableMembers.map { member ⇒
|
||||
// ----------------------
|
||||
// 5. Move UNREACHABLE => DOWN (auto-downing by leader)
|
||||
// ----------------------
|
||||
if (member.status == Down) member // no need to DOWN members already DOWN
|
||||
else member copy (status = Down)
|
||||
}
|
||||
|
||||
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
|
||||
localGossip copy (overview = newOverview) // update gossip
|
||||
// Check for the need to do side-effecting on successful state change
|
||||
val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down)
|
||||
|
||||
} else localGossip
|
||||
// removing nodes marked as DOWN from the 'seen' table
|
||||
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.address }
|
||||
|
||||
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
|
||||
val newGossip = localGossip copy (overview = newOverview) // update gossip
|
||||
|
||||
(newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers)
|
||||
|
||||
} else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member])
|
||||
|
||||
if (hasChangedState) { // we have a change of state - version it and try to update
|
||||
|
||||
// ----------------------
|
||||
// 5. Updating the vclock version for the changes
|
||||
// 6. Updating the vclock version for the changes
|
||||
// ----------------------
|
||||
val versionedGossip = newGossip :+ vclockNode
|
||||
|
||||
// ----------------------
|
||||
// 6. Updating the 'seen' table
|
||||
// 7. Updating the 'seen' table
|
||||
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
|
||||
// ----------------------
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
val seenVersionedGossip =
|
||||
if (removedMembers.exists(_.address == selfAddress)) versionedGossip
|
||||
else versionedGossip seen selfAddress
|
||||
|
||||
val newState = localState copy (latestGossip = seenVersionedGossip)
|
||||
|
||||
// if we won the race then update else try again
|
||||
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
|
||||
else {
|
||||
// ----------------------
|
||||
// 8. Try to update the state with the new gossip
|
||||
// ----------------------
|
||||
if (!state.compareAndSet(localState, newState)) {
|
||||
|
||||
// ----------------------
|
||||
// 9. Failure - retry
|
||||
// ----------------------
|
||||
leaderActions() // recur
|
||||
|
||||
} else {
|
||||
// ----------------------
|
||||
// 10. Success - run all the side-effecting processing
|
||||
// ----------------------
|
||||
|
||||
// if (removedMembers.exists(_.address == selfAddress)) {
|
||||
// // we now know that this node (the leader) is just about to shut down since it will be moved from EXITING -> REMOVED
|
||||
// // so now let's gossip out this information directly since there will not be any other chance
|
||||
// gossip()
|
||||
// }
|
||||
|
||||
// log the move of members from joining to up
|
||||
upMembers foreach { member ⇒ log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) }
|
||||
|
||||
// tell all removed members to remove and shut down themselves
|
||||
removedMembers foreach { member ⇒
|
||||
val address = member.address
|
||||
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address)
|
||||
clusterCommandConnectionFor(address) ! ClusterLeaderAction.Remove(address)
|
||||
}
|
||||
|
||||
// tell all exiting members to exit
|
||||
exitingMembers foreach { member ⇒
|
||||
val address = member.address
|
||||
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address)
|
||||
clusterCommandConnectionFor(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff?
|
||||
}
|
||||
|
||||
// log the auto-downing of the unreachable nodes
|
||||
unreachableButNotDownedMembers foreach { member ⇒
|
||||
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
|
||||
}
|
||||
|
||||
notifyMembershipChangeListeners(localState, newState)
|
||||
}
|
||||
}
|
||||
|
|
@ -1174,9 +1281,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
// Else we can't continue to check for convergence
|
||||
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
|
||||
// and that all members exists in seen table
|
||||
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { m ⇒
|
||||
m.status != Down && m.status != Removed
|
||||
}
|
||||
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down }
|
||||
val allMembersInSeen = gossip.members.forall(m ⇒ seen.contains(m.address))
|
||||
|
||||
if (hasUnreachable) {
|
||||
|
|
@ -1205,14 +1310,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
|
||||
private def isUnavailable(state: State): Boolean = {
|
||||
val localGossip = state.latestGossip
|
||||
val localOverview = localGossip.overview
|
||||
val localMembers = localGossip.members
|
||||
val localUnreachableMembers = localOverview.unreachable
|
||||
val isUnreachable = localUnreachableMembers exists { _.address == selfAddress }
|
||||
val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && m.status.isUnavailable }
|
||||
val isUnreachable = localGossip.overview.unreachable exists { _.address == selfAddress }
|
||||
val hasUnavailableMemberStatus = localGossip.members exists { m ⇒ (m == self) && m.status.isUnavailable }
|
||||
isUnreachable || hasUnavailableMemberStatus
|
||||
}
|
||||
|
||||
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
|
||||
val oldMembersStatus = oldState.latestGossip.members.map(m ⇒ (m.address, m.status))
|
||||
val newMembersStatus = newState.latestGossip.members.map(m ⇒ (m.address, m.status))
|
||||
if (newMembersStatus != oldMembersStatus)
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks up and returns the local cluster command connection.
|
||||
*/
|
||||
|
|
@ -1235,9 +1344,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[akka] def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
|
||||
private[cluster] def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
|
||||
if (addresses.isEmpty) None
|
||||
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
|
||||
|
||||
|
|
@ -1280,6 +1389,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
|
||||
def isAvailable: Boolean = clusterNode.isAvailable
|
||||
|
||||
def isRunning: Boolean = clusterNode.isRunning
|
||||
|
||||
// JMX commands
|
||||
|
||||
def join(address: String) = clusterNode.join(AddressFromURIString(address))
|
||||
|
|
@ -1287,10 +1398,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
|
||||
|
||||
def down(address: String) = clusterNode.down(AddressFromURIString(address))
|
||||
|
||||
def remove(address: String) = clusterNode.remove(AddressFromURIString(address))
|
||||
|
||||
def shutdown() = clusterNode.shutdown()
|
||||
}
|
||||
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName)
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable.SortedSet
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-interval = 30 s
|
||||
}""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
|
||||
|
||||
abstract class LeaderLeavingSpec
|
||||
extends MultiNodeSpec(LeaderLeavingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LeaderLeavingMultiJvmSpec._
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
lazy val thirdAddress = node(third).address
|
||||
|
||||
"A LEADER that is LEAVING" must {
|
||||
|
||||
"be moved to LEAVING, then to EXITING, then to REMOVED, then be shut down and then a new LEADER should be elected" taggedAs LongRunningTest in {
|
||||
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
val oldLeaderAddress = cluster.leader
|
||||
|
||||
if (cluster.isLeader) {
|
||||
|
||||
cluster.leave(oldLeaderAddress)
|
||||
testConductor.enter("leader-left")
|
||||
|
||||
// verify that a NEW LEADER have taken over
|
||||
awaitCond(!cluster.isLeader)
|
||||
|
||||
// verify that the LEADER is shut down
|
||||
awaitCond(!cluster.isRunning, 30.seconds.dilated)
|
||||
|
||||
// verify that the LEADER is REMOVED
|
||||
awaitCond(cluster.status == MemberStatus.Removed)
|
||||
|
||||
} else {
|
||||
|
||||
testConductor.enter("leader-left")
|
||||
|
||||
// verify that the LEADER is LEAVING
|
||||
awaitCond(cluster.latestGossip.members.exists(m => m.status == MemberStatus.Leaving && m.address == oldLeaderAddress)) // wait on LEAVING
|
||||
|
||||
// verify that the LEADER is EXITING
|
||||
awaitCond(cluster.latestGossip.members.exists(m => m.status == MemberStatus.Exiting && m.address == oldLeaderAddress)) // wait on EXITING
|
||||
|
||||
// verify that the LEADER is no longer part of the 'members' set
|
||||
awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress))
|
||||
|
||||
// verify that the LEADER is not part of the 'unreachable' set
|
||||
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress))
|
||||
|
||||
// verify that we have a new LEADER
|
||||
awaitCond(cluster.leader != oldLeaderAddress)
|
||||
}
|
||||
|
||||
testConductor.enter("finished")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -18,9 +18,9 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig
|
|||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
|
||||
|
||||
abstract class NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
|
||||
|
|
@ -36,8 +36,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
|
|||
|
||||
"A node that is LEAVING a non-singleton cluster" must {
|
||||
|
||||
// FIXME make it work and remove ignore
|
||||
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest ignore {
|
||||
"eventually set to REMOVED by the reaper, and removed from membership ring and seen table" taggedAs LongRunningTest in {
|
||||
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
|
|
@ -50,13 +49,14 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
|
|||
// verify that the 'second' node is no longer part of the 'members' set
|
||||
awaitCond(cluster.latestGossip.members.forall(_.address != secondAddress), reaperWaitingTime)
|
||||
|
||||
// verify that the 'second' node is part of the 'unreachable' set
|
||||
awaitCond(cluster.latestGossip.overview.unreachable.exists(_.status == MemberStatus.Removed), reaperWaitingTime)
|
||||
// verify that the 'second' node is not part of the 'unreachable' set
|
||||
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != secondAddress), reaperWaitingTime)
|
||||
}
|
||||
|
||||
// verify node that got removed is 'second' node
|
||||
val isRemoved = cluster.latestGossip.overview.unreachable.find(_.status == MemberStatus.Removed)
|
||||
isRemoved must be('defined)
|
||||
isRemoved.get.address must be(secondAddress)
|
||||
runOn(second) {
|
||||
// verify that the second node is shut down and has status REMOVED
|
||||
awaitCond(!cluster.isRunning, reaperWaitingTime)
|
||||
awaitCond(cluster.status == MemberStatus.Removed, reaperWaitingTime)
|
||||
}
|
||||
|
||||
testConductor.enter("finished")
|
||||
|
|
|
|||
|
|
@ -42,8 +42,7 @@ abstract class NodeLeavingAndExitingSpec
|
|||
|
||||
"A node that is LEAVING a non-singleton cluster" must {
|
||||
|
||||
// FIXME make it work and remove ignore
|
||||
"be moved to EXITING by the leader" taggedAs LongRunningTest ignore {
|
||||
"be moved to EXITING by the leader" taggedAs LongRunningTest in {
|
||||
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
|
|
|
|||
|
|
@ -36,8 +36,7 @@ abstract class NodeLeavingSpec
|
|||
|
||||
"A node that is LEAVING a non-singleton cluster" must {
|
||||
|
||||
// FIXME make it work and remove ignore
|
||||
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest ignore {
|
||||
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in {
|
||||
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import akka.actor.{ Address, AddressFromURIString }
|
||||
import java.net.InetSocketAddress
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.WordSpec
|
||||
import scala.collection.immutable.SortedSet
|
||||
import scala.util.Random
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class MemberOrderingSpec extends WordSpec with MustMatchers {
|
||||
import Member.ordering
|
||||
import Member.addressOrdering
|
||||
import MemberStatus._
|
||||
|
||||
"An Ordering[Member]" must {
|
||||
|
||||
"order non-exiting members by host:port" in {
|
||||
val members = SortedSet.empty[Member] +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1112"), Up) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1111"), Up)
|
||||
|
||||
val seq = members.toSeq
|
||||
seq.size must equal(3)
|
||||
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
|
||||
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Up))
|
||||
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining))
|
||||
}
|
||||
|
||||
"order exiting members by last" in {
|
||||
val members = SortedSet.empty[Member] +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1113"), Up) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining)
|
||||
|
||||
val seq = members.toSeq
|
||||
seq.size must equal(3)
|
||||
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining))
|
||||
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Up))
|
||||
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
|
||||
}
|
||||
|
||||
"order multiple exiting members by last but internally by host:port" in {
|
||||
val members = SortedSet.empty[Member] +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1111"), Up) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting)
|
||||
|
||||
val seq = members.toSeq
|
||||
seq.size must equal(4)
|
||||
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
|
||||
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving))
|
||||
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting))
|
||||
seq(3) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
|
||||
}
|
||||
|
||||
"be sorted by address correctly" in {
|
||||
import Member.ordering
|
||||
// sorting should be done on host and port, only
|
||||
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
|
||||
val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
|
||||
val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
|
||||
|
||||
val expected = IndexedSeq(m1, m2, m3, m4, m5)
|
||||
val shuffled = Random.shuffle(expected)
|
||||
shuffled.sorted must be(expected)
|
||||
(SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected)
|
||||
}
|
||||
|
||||
"have stable equals and hashCode" in {
|
||||
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining)
|
||||
val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
|
||||
m1 must be(m2)
|
||||
m1.hashCode must be(m2.hashCode)
|
||||
|
||||
m3 must not be (m2)
|
||||
m3 must not be (m1)
|
||||
}
|
||||
}
|
||||
|
||||
"An Ordering[Address]" must {
|
||||
|
||||
"order addresses by port" in {
|
||||
val addresses = SortedSet.empty[Address] +
|
||||
AddressFromURIString("akka://sys@darkstar:1112") +
|
||||
AddressFromURIString("akka://sys@darkstar:1113") +
|
||||
AddressFromURIString("akka://sys@darkstar:1110") +
|
||||
AddressFromURIString("akka://sys@darkstar:1111")
|
||||
|
||||
val seq = addresses.toSeq
|
||||
seq.size must equal(4)
|
||||
seq(0) must equal(AddressFromURIString("akka://sys@darkstar:1110"))
|
||||
seq(1) must equal(AddressFromURIString("akka://sys@darkstar:1111"))
|
||||
seq(2) must equal(AddressFromURIString("akka://sys@darkstar:1112"))
|
||||
seq(3) must equal(AddressFromURIString("akka://sys@darkstar:1113"))
|
||||
}
|
||||
|
||||
"order addresses by hostname" in {
|
||||
val addresses = SortedSet.empty[Address] +
|
||||
AddressFromURIString("akka://sys@darkstar2:1110") +
|
||||
AddressFromURIString("akka://sys@darkstar1:1110") +
|
||||
AddressFromURIString("akka://sys@darkstar3:1110") +
|
||||
AddressFromURIString("akka://sys@darkstar0:1110")
|
||||
|
||||
val seq = addresses.toSeq
|
||||
seq.size must equal(4)
|
||||
seq(0) must equal(AddressFromURIString("akka://sys@darkstar0:1110"))
|
||||
seq(1) must equal(AddressFromURIString("akka://sys@darkstar1:1110"))
|
||||
seq(2) must equal(AddressFromURIString("akka://sys@darkstar2:1110"))
|
||||
seq(3) must equal(AddressFromURIString("akka://sys@darkstar3:1110"))
|
||||
}
|
||||
|
||||
"order addresses by hostname and port" in {
|
||||
val addresses = SortedSet.empty[Address] +
|
||||
AddressFromURIString("akka://sys@darkstar2:1110") +
|
||||
AddressFromURIString("akka://sys@darkstar0:1111") +
|
||||
AddressFromURIString("akka://sys@darkstar2:1111") +
|
||||
AddressFromURIString("akka://sys@darkstar0:1110")
|
||||
|
||||
val seq = addresses.toSeq
|
||||
seq.size must equal(4)
|
||||
seq(0) must equal(AddressFromURIString("akka://sys@darkstar0:1110"))
|
||||
seq(1) must equal(AddressFromURIString("akka://sys@darkstar0:1111"))
|
||||
seq(2) must equal(AddressFromURIString("akka://sys@darkstar2:1110"))
|
||||
seq(3) must equal(AddressFromURIString("akka://sys@darkstar2:1111"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,45 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.actor.Address
|
||||
import scala.util.Random
|
||||
import scala.collection.immutable.SortedSet
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class MemberSpec extends WordSpec with MustMatchers {
|
||||
|
||||
"Member" must {
|
||||
|
||||
"be sorted by address correctly" in {
|
||||
import Member.ordering
|
||||
// sorting should be done on host and port, only
|
||||
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
|
||||
val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
|
||||
val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
|
||||
|
||||
val expected = IndexedSeq(m1, m2, m3, m4, m5)
|
||||
val shuffled = Random.shuffle(expected)
|
||||
shuffled.sorted must be(expected)
|
||||
(SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected)
|
||||
}
|
||||
|
||||
"have stable equals and hashCode" in {
|
||||
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining)
|
||||
val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
|
||||
val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
|
||||
|
||||
m1 must be(m2)
|
||||
m1.hashCode must be(m2.hashCode)
|
||||
|
||||
m3 must not be (m2)
|
||||
m3 must not be (m1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -5,8 +5,7 @@
|
|||
Cluster Specification
|
||||
######################
|
||||
|
||||
.. note:: *This document describes the new clustering coming in Akka Coltrane and
|
||||
is not available in the latest stable release)*
|
||||
.. note:: *This document describes the new clustering coming in Akka Coltrane and is not available in the latest stable release)*
|
||||
|
||||
Intro
|
||||
=====
|
||||
|
|
@ -164,8 +163,8 @@ After gossip convergence a ``leader`` for the cluster can be determined. There i
|
|||
``leader`` election process, the ``leader`` can always be recognised deterministically
|
||||
by any node whenever there is gossip convergence. The ``leader`` is simply the first
|
||||
node in sorted order that is able to take the leadership role, where the only
|
||||
allowed member states for a ``leader`` are ``up`` or ``leaving`` (see below for more
|
||||
information about member states).
|
||||
allowed member states for a ``leader`` are ``up``, ``leaving`` or ``exiting`` (see
|
||||
below for more information about member states).
|
||||
|
||||
The role of the ``leader`` is to shift members in and out of the cluster, changing
|
||||
``joining`` members to the ``up`` state or ``exiting`` members to the
|
||||
|
|
@ -302,10 +301,6 @@ handoff has completed then the node will change to the ``exiting`` state. Once
|
|||
all nodes have seen the exiting state (convergence) the ``leader`` will remove the
|
||||
node from the cluster, marking it as ``removed``.
|
||||
|
||||
A node can also be removed forcefully by moving it directly to the ``removed``
|
||||
state using the ``remove`` action. The cluster will rebalance based on the new
|
||||
cluster membership.
|
||||
|
||||
If a node is unreachable then gossip convergence is not possible and therefore
|
||||
any ``leader`` actions are also not possible (for instance, allowing a node to
|
||||
become a part of the cluster, or changing actor distribution). To be able to
|
||||
|
|
@ -314,11 +309,12 @@ unreachable node is experiencing only transient difficulties then it can be
|
|||
explicitly marked as ``down`` using the ``down`` user action. When this node
|
||||
comes back up and begins gossiping it will automatically go through the joining
|
||||
process again. If the unreachable node will be permanently down then it can be
|
||||
removed from the cluster directly with the ``remove`` user action. The cluster
|
||||
can also *auto-down* a node using the accrual failure detector.
|
||||
removed from the cluster directly by shutting the actor system down or killing it
|
||||
through an external ``SIGKILL`` signal, invocation of ``System.exit(status)`` or
|
||||
similar. The cluster can, through the leader, also *auto-down* a node.
|
||||
|
||||
This means that nodes can join and leave the cluster at any point in time,
|
||||
e.g. provide cluster elasticity.
|
||||
This means that nodes can join and leave the cluster at any point in time, i.e.
|
||||
provide cluster elasticity.
|
||||
|
||||
|
||||
State Diagram for the Member States
|
||||
|
|
@ -339,12 +335,12 @@ Member States
|
|||
- **leaving** / **exiting**
|
||||
states during graceful removal
|
||||
|
||||
- **removed**
|
||||
tombstone state (no longer a member)
|
||||
|
||||
- **down**
|
||||
marked as down/offline/unreachable
|
||||
|
||||
- **removed**
|
||||
tombstone state (no longer a member)
|
||||
|
||||
|
||||
User Actions
|
||||
^^^^^^^^^^^^
|
||||
|
|
@ -359,9 +355,6 @@ User Actions
|
|||
- **down**
|
||||
mark a node as temporarily down
|
||||
|
||||
- **remove**
|
||||
remove a node from the cluster immediately
|
||||
|
||||
|
||||
Leader Actions
|
||||
^^^^^^^^^^^^^^
|
||||
|
|
|
|||
|
|
@ -48,14 +48,14 @@ At-most-once
|
|||
|
||||
Actual transports may provide stronger semantics,
|
||||
but at-most-once is the semantics you should expect.
|
||||
The alternatives would be once-and-only-once, which is extremely costly,
|
||||
The alternatives would be once-and-only-once, which is extremely costly,
|
||||
or at-least-once which essentially requires idempotency of message processing,
|
||||
which is a user-level concern.
|
||||
|
||||
Ordering is preserved on a per-sender basis
|
||||
-------------------------------------------
|
||||
|
||||
Actor ``A1` sends messages ``M1``, ``M2``, ``M3`` to ``A2``
|
||||
Actor ``A1`` sends messages ``M1``, ``M2``, ``M3`` to ``A2``
|
||||
Actor ``A3`` sends messages ``M4``, ``M5``, ``M6`` to ``A2``
|
||||
|
||||
This means that:
|
||||
|
|
@ -66,4 +66,4 @@ This means that:
|
|||
5) ``A2`` can see messages from ``A1`` interleaved with messages from ``A3``
|
||||
6) Since there is no guaranteed delivery, none, some or all of the messages may arrive to ``A2``
|
||||
|
||||
.. _Erlang documentation: http://www.erlang.org/faq/academic.html
|
||||
.. _Erlang documentation: http://www.erlang.org/faq/academic.html
|
||||
|
|
|
|||
29
akka-kernel/src/main/dist/bin/akka-cluster
vendored
29
akka-kernel/src/main/dist/bin/akka-cluster
vendored
|
|
@ -63,20 +63,6 @@ case "$2" in
|
|||
$JMX_CLIENT $HOST akka:type=Cluster leave=$ACTOR_SYSTEM_URL
|
||||
;;
|
||||
|
||||
remove)
|
||||
if [ $# -ne 3 ]; then
|
||||
echo "Usage: $SELF <node-hostname:jmx-port> remove <actor-system-url-to-join>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
ensureNodeIsRunningAndAvailable
|
||||
shift
|
||||
|
||||
ACTOR_SYSTEM_URL=$2
|
||||
echo "Scheduling $ACTOR_SYSTEM_URL to REMOVE"
|
||||
$JMX_CLIENT $HOST akka:type=Cluster remove=$ACTOR_SYSTEM_URL
|
||||
;;
|
||||
|
||||
down)
|
||||
if [ $# -ne 3 ]; then
|
||||
echo "Usage: $SELF <node-hostname:jmx-port> down <actor-system-url-to-join>"
|
||||
|
|
@ -169,19 +155,32 @@ case "$2" in
|
|||
$JMX_CLIENT $HOST akka:type=Cluster Available
|
||||
;;
|
||||
|
||||
is-running)
|
||||
if [ $# -ne 2 ]; then
|
||||
echo "Usage: $SELF <node-hostname:jmx-port> is-running"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
ensureNodeIsRunningAndAvailable
|
||||
shift
|
||||
|
||||
echo "Checking if member node on $HOST is AVAILABLE"
|
||||
$JMX_CLIENT $HOST akka:type=Cluster Running
|
||||
;;
|
||||
|
||||
*)
|
||||
printf "Usage: bin/$SELF <node-hostname:jmx-port> <command> ...\n"
|
||||
printf "\n"
|
||||
printf "Supported commands are:\n"
|
||||
printf "%26s - %s\n" "join <actor-system-url>" "Sends request a JOIN node with the specified URL"
|
||||
printf "%26s - %s\n" "leave <actor-system-url>" "Sends a request for node with URL to LEAVE the cluster"
|
||||
printf "%26s - %s\n" "remove <actor-system-url>" "Sends a request for node with URL to be instantly REMOVED from the cluster"
|
||||
printf "%26s - %s\n" "down <actor-system-url>" "Sends a request for marking node with URL as DOWN"
|
||||
printf "%26s - %s\n" member-status "Asks the member node for its current status"
|
||||
printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)"
|
||||
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
|
||||
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
|
||||
printf "%26s - %s\n" is-available "Checks if the member node is available"
|
||||
printf "%26s - %s\n" is-running "Checks if the member node is running"
|
||||
printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence"
|
||||
printf "Where the <actor-system-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
|
||||
printf "\n"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue