Change cluster MemberEvents to only be published on convergence. See #2692

Conflicts:
	akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
	akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala
	akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
	akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala
	akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
	akka-docs/rst/cluster/cluster-usage-java.rst
	akka-docs/rst/cluster/cluster-usage-scala.rst
	akka-kernel/src/main/dist/bin/akka-cluster
This commit is contained in:
Björn Antonsson 2012-11-27 18:07:37 +01:00
parent 0c9ad2f791
commit a03460329d
21 changed files with 327 additions and 304 deletions

View file

@ -120,10 +120,12 @@ private[cluster] object InternalClusterAction {
*/ */
case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage
case class PublishChanges(oldGossip: Gossip, newGossip: Gossip) sealed trait PublishMessage
case class PublishEvent(event: ClusterDomainEvent) case class PublishChanges(newGossip: Gossip) extends PublishMessage
case object PublishDone case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
case object PublishStart extends PublishMessage
case object PublishDone extends PublishMessage
case object PublishDoneFinished extends PublishMessage
} }
/** /**
@ -280,14 +282,14 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
*/ */
def join(address: Address): Unit = { def join(address: Address): Unit = {
if (!latestGossip.members.exists(_.address == address)) { if (!latestGossip.members.exists(_.address == address)) {
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty // wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip() latestGossip = Gossip()
// wipe the failure detector since we are starting fresh and shouldn't care about the past // wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset() failureDetector.reset()
// wipe the publisher since we are starting fresh
publisher ! PublishStart
publish(localGossip) publish(latestGossip)
heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
context.become(initialized) context.become(initialized)
@ -302,18 +304,16 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* State transition to JOINING - new node joining. * State transition to JOINING - new node joining.
*/ */
def joining(node: Address): Unit = { def joining(node: Address): Unit = {
val localGossip = latestGossip val localMembers = latestGossip.members
val localMembers = localGossip.members val localUnreachable = latestGossip.overview.unreachable
val localUnreachable = localGossip.overview.unreachable
val alreadyMember = localMembers.exists(_.address == node) val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localGossip.overview.isNonDownUnreachable(node) val isUnreachable = latestGossip.overview.isNonDownUnreachable(node)
if (!alreadyMember && !isUnreachable) { if (!alreadyMember && !isUnreachable) {
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) val newOverview = latestGossip.overview copy (unreachable = newUnreachableMembers)
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster // remove the node from the failure detector if it is a DOWN node that is rejoining cluster
if (rejoiningMember.nonEmpty) failureDetector.remove(node) if (rejoiningMember.nonEmpty) failureDetector.remove(node)
@ -321,7 +321,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// add joining node as Joining // add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates) // add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) val newGossip = latestGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip :+ vclockNode val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress val seenVersionedGossip = versionedGossip seen selfAddress
@ -335,7 +335,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
gossipTo(node) gossipTo(node)
} }
publish(localGossip) publish(latestGossip)
} }
} }
@ -343,10 +343,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* State transition to LEAVING. * State transition to LEAVING.
*/ */
def leaving(address: Address): Unit = { def leaving(address: Address): Unit = {
val localGossip = latestGossip if (latestGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) val newMembers = latestGossip.members map { member if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
val newMembers = localGossip.members map { member if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING val newGossip = latestGossip copy (members = newMembers)
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress val seenVersionedGossip = versionedGossip seen selfAddress
@ -354,7 +353,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
latestGossip = seenVersionedGossip latestGossip = seenVersionedGossip
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
publish(localGossip) publish(latestGossip)
} }
} }
@ -377,10 +376,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
*/ */
def removing(address: Address): Unit = { def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
val localGossip = latestGossip
// just cleaning up the gossip state // just cleaning up the gossip state
latestGossip = Gossip() latestGossip = Gossip()
publish(localGossip) publish(latestGossip)
context.become(removed) context.become(removed)
// make sure the final (removed) state is published // make sure the final (removed) state is published
// before shutting down // before shutting down
@ -435,7 +433,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val versionedGossip = newGossip :+ vclockNode val versionedGossip = newGossip :+ vclockNode
latestGossip = versionedGossip seen selfAddress latestGossip = versionedGossip seen selfAddress
publish(localGossip) publish(latestGossip)
} }
/** /**
@ -524,7 +522,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} }
stats = stats.incrementReceivedGossipCount stats = stats.incrementReceivedGossipCount
publish(localGossip) publish(latestGossip)
if (envelope.conversation && if (envelope.conversation &&
(conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) {
@ -730,7 +728,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
} }
publish(localGossip) publish(latestGossip)
} }
} }
} }
@ -767,7 +765,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
publish(localGossip) publish(latestGossip)
} }
} }
} }
@ -805,8 +803,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress)
coreSender ! SendClusterMessage(address, gossipMsg) coreSender ! SendClusterMessage(address, gossipMsg)
def publish(oldGossip: Gossip): Unit = { def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(oldGossip, latestGossip) publisher ! PublishChanges(newGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats() if (PublishStatsInterval == Duration.Zero) publishInternalStats()
} }

View file

@ -5,6 +5,7 @@ package akka.cluster
import language.postfixOps import language.postfixOps
import scala.collection.immutable import scala.collection.immutable
import scala.collection.immutable.{ VectorBuilder, SortedSet }
import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._ import akka.cluster.MemberStatus._
@ -33,7 +34,6 @@ object ClusterEvent {
case class CurrentClusterState( case class CurrentClusterState(
members: immutable.SortedSet[Member] = immutable.SortedSet.empty, members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
unreachable: Set[Member] = Set.empty, unreachable: Set[Member] = Set.empty,
convergence: Boolean = false,
seenBy: Set[Address] = Set.empty, seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None) extends ClusterDomainEvent { leader: Option[Address] = None) extends ClusterDomainEvent {
@ -75,57 +75,47 @@ object ClusterEvent {
} }
/** /**
* A new member joined the cluster. * A new member joined the cluster. Only published after convergence.
*/ */
case class MemberJoined(member: Member) extends MemberEvent { case class MemberJoined(member: Member) extends MemberEvent {
if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member)
} }
/** /**
* Member status changed to Up * Member status changed to Up. Only published after convergence.
*/ */
case class MemberUp(member: Member) extends MemberEvent { case class MemberUp(member: Member) extends MemberEvent {
if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
} }
/** /**
* Member status changed to Leaving * Member status changed to Leaving. Only published after convergence.
*/ */
case class MemberLeft(member: Member) extends MemberEvent { case class MemberLeft(member: Member) extends MemberEvent {
if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
} }
/** /**
* Member status changed to Exiting * Member status changed to Exiting. Only published after convergence.
*/ */
case class MemberExited(member: Member) extends MemberEvent { case class MemberExited(member: Member) extends MemberEvent {
if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
} }
/** /**
* A member is considered as unreachable by the failure detector. * Member status changed to Down. Only published after convergence.
*/
case class MemberUnreachable(member: Member) extends MemberEvent
/**
* Member status changed to Down
*/ */
case class MemberDowned(member: Member) extends MemberEvent { case class MemberDowned(member: Member) extends MemberEvent {
if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member)
} }
/** /**
* Member completely removed from the cluster * Member completely removed from the cluster. Only published after convergence.
*/ */
case class MemberRemoved(member: Member) extends MemberEvent { case class MemberRemoved(member: Member) extends MemberEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
} }
/**
* Cluster convergence state changed.
*/
case class ConvergenceChanged(convergence: Boolean) extends ClusterDomainEvent
/** /**
* Leader of the cluster members changed. Only published after convergence. * Leader of the cluster members changed. Only published after convergence.
*/ */
@ -138,6 +128,12 @@ object ClusterEvent {
} }
/** /**
* A member is considered as unreachable by the failure detector.
*/
case class UnreachableMember(member: Member) extends ClusterDomainEvent
/**
* INTERNAL API
* *
* Current snapshot of cluster node metrics. Published to subscribers. * Current snapshot of cluster node metrics. Published to subscribers.
*/ */
@ -163,29 +159,42 @@ object ClusterEvent {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[cluster] def diff(oldGossip: Gossip, newGossip: Gossip): immutable.IndexedSeq[ClusterDomainEvent] = { private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] =
val newMembers = newGossip.members -- oldGossip.members if (newGossip eq oldGossip) Nil
else {
val newUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable
val unreachableEvents = newUnreachable map UnreachableMember
val membersGroupedByAddress = (newGossip.members.toList ++ oldGossip.members.toList).groupBy(_.address) immutable.Seq.empty ++ unreachableEvents
}
/**
* INTERNAL API.
*/
private[cluster] def diffMemberEvents(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[MemberEvent] =
if (newGossip eq oldGossip) Nil
else {
val newMembers = newGossip.members -- oldGossip.members
val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.address)
val changedMembers = membersGroupedByAddress collect { val changedMembers = membersGroupedByAddress collect {
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status newMember case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status newMember
} }
val memberEvents = (newMembers ++ changedMembers) map { m val memberEvents = (newMembers ++ changedMembers) map { m
if (m.status == Joining) MemberJoined(m) m.status match {
else if (m.status == Up) MemberUp(m) case Joining MemberJoined(m)
else if (m.status == Leaving) MemberLeft(m) case Up MemberUp(m)
else if (m.status == Exiting) MemberExited(m) case Leaving MemberLeft(m)
else throw new IllegalStateException("Unexpected member status: " + m) case Exiting MemberExited(m)
case _ throw new IllegalStateException("Unexpected member status: " + m)
}
} }
val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable
val (newDowned, newUnreachable) = allNewUnreachable partition { _.status == Down } val newDowned = allNewUnreachable filter { _.status == Down }
val downedEvents = newDowned map MemberDowned val downedEvents = newDowned map MemberDowned
val unreachableEvents = newUnreachable map MemberUnreachable
val unreachableGroupedByAddress = val unreachableGroupedByAddress =
(newGossip.overview.unreachable.toList ++ oldGossip.overview.unreachable.toList).groupBy(_.address) List(newGossip.overview.unreachable, oldGossip.overview.unreachable).flatten.groupBy(_.address)
val unreachableDownMembers = unreachableGroupedByAddress collect { val unreachableDownMembers = unreachableGroupedByAddress collect {
case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status
newMember newMember
@ -196,23 +205,29 @@ object ClusterEvent {
MemberRemoved(m.copy(status = Removed)) MemberRemoved(m.copy(status = Removed))
} }
val newConvergence = newGossip.convergence (new VectorBuilder[MemberEvent]() ++= memberEvents ++= downedEvents ++= unreachableDownedEvents
val convergenceChanged = newConvergence != oldGossip.convergence ++= removedEvents).result()
val convergenceEvents = if (convergenceChanged) List(ConvergenceChanged(newConvergence)) else EmptyImmutableSeq
val leaderEvents =
if (newGossip.leader != oldGossip.leader) List(LeaderChanged(newGossip.leader))
else EmptyImmutableSeq
val newSeenBy = newGossip.seenBy
val seenEvents =
if (convergenceChanged || newSeenBy != oldGossip.seenBy) List(SeenChanged(newConvergence, newSeenBy))
else EmptyImmutableSeq
memberEvents.toVector ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++
leaderEvents ++ convergenceEvents ++ seenEvents
} }
/**
* INTERNAL API
*/
private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] =
if (newGossip.leader != oldGossip.leader) List(LeaderChanged(newGossip.leader))
else Nil
/**
* INTERNAL API
*/
private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[SeenChanged] =
if (newGossip eq oldGossip) Nil
else {
val newConvergence = newGossip.convergence
val newSeenBy = newGossip.seenBy
if (newConvergence != oldGossip.convergence || newSeenBy != oldGossip.seenBy)
List(SeenChanged(newConvergence, newSeenBy))
else Nil
}
} }
/** /**
@ -224,34 +239,30 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
import InternalClusterAction._ import InternalClusterAction._
var latestGossip: Gossip = Gossip() var latestGossip: Gossip = Gossip()
var latestConvergedGossip: Gossip = Gossip()
// Keep track of LeaderChanged event. Should not be published until var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty
// convergence, and it should only be published when leader actually
// changed to another node. 3 states:
// - None: No LeaderChanged detected yet, nothing published yet
// - Some(Left): Stashed LeaderChanged to be published later, when convergence
// - Some(Right): Latest published LeaderChanged
var leaderChangedState: Option[Either[LeaderChanged, LeaderChanged]] = None
def receive = { def receive = {
case PublishChanges(oldGossip, newGossip) publishChanges(oldGossip, newGossip) case PublishChanges(newGossip) publishChanges(newGossip)
case currentStats: CurrentInternalStats publishInternalStats(currentStats) case currentStats: CurrentInternalStats publishInternalStats(currentStats)
case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver) case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver)
case Subscribe(subscriber, to) subscribe(subscriber, to) case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber, to) unsubscribe(subscriber, to) case Unsubscribe(subscriber, to) unsubscribe(subscriber, to)
case PublishEvent(event) publish(event) case PublishEvent(event) publish(event)
case PublishDone sender ! PublishDone case PublishStart publishStart()
case PublishDone publishDone(sender)
} }
def eventStream: EventStream = context.system.eventStream def eventStream: EventStream = context.system.eventStream
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
// The state is a mix of converged and latest gossip to mimic what you
// would have seen if you where listening to the events.
val state = CurrentClusterState( val state = CurrentClusterState(
members = latestGossip.members, members = latestConvergedGossip.members,
unreachable = latestGossip.overview.unreachable, unreachable = latestGossip.overview.unreachable,
convergence = latestGossip.convergence,
seenBy = latestGossip.seenBy, seenBy = latestGossip.seenBy,
leader = latestGossip.leader) leader = latestConvergedGossip.leader)
receiver match { receiver match {
case Some(ref) ref ! state case Some(ref) ref ! state
case None publish(state) case None publish(state)
@ -268,54 +279,43 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case Some(c) eventStream.unsubscribe(subscriber, c) case Some(c) eventStream.unsubscribe(subscriber, c)
} }
def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { def publishChanges(newGossip: Gossip): Unit = {
val oldGossip = latestGossip
// keep the latestGossip to be sent to new subscribers // keep the latestGossip to be sent to new subscribers
latestGossip = newGossip latestGossip = newGossip
diff(oldGossip, newGossip) foreach { event // first publish the diffUnreachable between the last two gossips
event match { diffUnreachable(oldGossip, newGossip) foreach { event
case x @ LeaderChanged(_) if leaderChangedState == Some(Right(x)) publish(event)
// skip, this leader has already been published // notify DeathWatch about unreachable node
publish(AddressTerminated(event.member.address))
case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence
// leader changed and immediate convergence
leaderChangedState = Some(Right(x))
publish(x)
case x: LeaderChanged
// publish later, when convergence
leaderChangedState = Some(Left(x))
case ConvergenceChanged(true)
// now it's convergence, publish eventual stashed LeaderChanged event
leaderChangedState match {
case Some(Left(x))
leaderChangedState = Some(Right(x))
publish(x)
case _ // nothing stashed
}
publish(event)
case MemberDowned(m)
// TODO this case might be collapsed with MemberRemoved, see ticket #2788
// but right now we don't change Downed to Removed
publish(event)
// notify DeathWatch about downed node
publish(AddressTerminated(m.address))
case MemberRemoved(m)
publish(event)
// notify DeathWatch about removed node
publish(AddressTerminated(m.address))
case _
// all other events
publish(event)
} }
// buffer up the MemberEvents waiting for convergence
memberEvents ++= diffMemberEvents(oldGossip, newGossip)
// if we have convergence then publish the MemberEvents and possibly a LeaderChanged
if (newGossip.convergence) {
val previousConvergedGossip = latestConvergedGossip
latestConvergedGossip = newGossip
memberEvents foreach publish
memberEvents = immutable.Seq.empty
diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish
} }
// publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach publish
} }
def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats) def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats)
def publish(event: AnyRef): Unit = eventStream publish event def publish(event: AnyRef): Unit = eventStream publish event
def publishStart(): Unit = clearState()
def publishDone(receiver: ActorRef): Unit = {
clearState()
receiver ! PublishDoneFinished
}
def clearState(): Unit = {
latestGossip = Gossip()
latestConvergedGossip = Gossip()
}
} }

View file

@ -96,7 +96,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval, val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval,
HeartbeatInterval, self, HeartbeatTick) HeartbeatInterval, self, HeartbeatTick)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
}
override def postStop(): Unit = { override def postStop(): Unit = {
heartbeatTask.cancel() heartbeatTask.cancel()
@ -112,7 +115,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def receive = { def receive = {
case HeartbeatTick heartbeat() case HeartbeatTick heartbeat()
case s: CurrentClusterState reset(s) case s: CurrentClusterState reset(s)
case MemberUnreachable(m) removeMember(m) case UnreachableMember(m) removeMember(m)
case MemberRemoved(m) removeMember(m) case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member) case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) addJoinInProgress(a, d) case JoinInProgress(a, d) addJoinInProgress(a, d)

View file

@ -77,6 +77,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
override def preStart(): Unit = { override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
log.info("Metrics collection has started successfully on node [{}]", selfAddress) log.info("Metrics collection has started successfully on node [{}]", selfAddress)
} }
@ -85,7 +86,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
case MetricsTick collect() case MetricsTick collect()
case state: CurrentClusterState receiveState(state) case state: CurrentClusterState receiveState(state)
case MemberUp(m) addMember(m) case MemberUp(m) addMember(m)
case e: MemberEvent removeMember(e) case e: MemberEvent removeMember(e.member)
case UnreachableMember(m) removeMember(m)
case msg: MetricsGossipEnvelope receiveGossip(msg) case msg: MetricsGossipEnvelope receiveGossip(msg)
} }
@ -104,9 +106,9 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/** /**
* Removes a member from the member node ring. * Removes a member from the member node ring.
*/ */
def removeMember(event: MemberEvent): Unit = { def removeMember(member: Member): Unit = {
nodes -= event.member.address nodes -= member.address
latestGossip = latestGossip remove event.member.address latestGossip = latestGossip remove member.address
publish() publish()
} }

View file

@ -45,11 +45,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
override def postStop(): Unit = cluster.unsubscribe(self) override def postStop(): Unit = cluster.unsubscribe(self)
def receive = { def receive = {
case e: ClusterDomainEvent e match {
case SeenChanged(convergence, seenBy) case SeenChanged(convergence, seenBy)
state = state.copy(convergence = convergence, seenBy = seenBy) state = state.copy(seenBy = seenBy)
case MemberRemoved(member) case MemberRemoved(member)
state = state.copy(members = state.members - member, unreachable = state.unreachable - member) state = state.copy(members = state.members - member, unreachable = state.unreachable - member)
case MemberUnreachable(member) case UnreachableMember(member)
// replace current member with new member (might have different status, only address is used in equals) // replace current member with new member (might have different status, only address is used in equals)
state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member)
case MemberDowned(member) case MemberDowned(member)
@ -59,11 +60,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
// replace current member with new member (might have different status, only address is used in equals) // replace current member with new member (might have different status, only address is used in equals)
state = state.copy(members = state.members - event.member + event.member) state = state.copy(members = state.members - event.member + event.member)
case LeaderChanged(leader) state = state.copy(leader = leader) case LeaderChanged(leader) state = state.copy(leader = leader)
case ConvergenceChanged(convergence) state = state.copy(convergence = convergence)
case s: CurrentClusterState state = s case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats case CurrentInternalStats(stats) _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
case _ // ignore, not interesting }
} }
}).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener")
} }
@ -112,11 +112,6 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
*/ */
def isSingletonCluster: Boolean = members.size == 1 def isSingletonCluster: Boolean = members.size == 1
/**
* Checks if we have a cluster convergence.
*/
def convergence: Boolean = state.convergence
/** /**
* Returns true if the node is not unreachable and not `Down` * Returns true if the node is not unreachable and not `Down`
* and not `Removed`. * and not `Removed`.

View file

@ -135,7 +135,7 @@ private[cluster] case class Gossip(
* Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence - * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence -
* waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down). * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down).
* *
* @return Some(convergedGossip) if convergence have been reached and None if not * @return true if convergence have been reached and false if not
*/ */
def convergence: Boolean = { def convergence: Boolean = {
val unreachable = overview.unreachable val unreachable = overview.unreachable
@ -151,8 +151,10 @@ private[cluster] case class Gossip(
def allMembersInSeen = members.forall(m seen.contains(m.address)) def allMembersInSeen = members.forall(m seen.contains(m.address))
def seenSame: Boolean = def seenSame: Boolean =
if (seen.isEmpty) false if (seen.isEmpty) {
else { // if both seen and members are empty, then every(no)body has seen the same thing
members.isEmpty
} else {
val values = seen.values val values = seen.values
val seenHead = values.head val seenHead = values.head
values.forall(_ == seenHead) values.forall(_ == seenHead)

View file

@ -160,7 +160,7 @@ case class VectorClock(
* Compare two vector clocks. The outcomes will be one of the following: * Compare two vector clocks. The outcomes will be one of the following:
* <p/> * <p/>
* {{{ * {{{
* 1. Clock 1 is BEFORE (>) Clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j). * 1. Clock 1 is BEFORE (>) Clock 2 if there exists an i such that c1(i) <= c2(i) and there does not exist a j such that c1(j) > c2(j).
* 2. Clock 1 is CONCURRENT (<>) to Clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). * 2. Clock 1 is CONCURRENT (<>) to Clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j).
* 3. Clock 1 is AFTER (<) Clock 2 otherwise. * 3. Clock 1 is AFTER (<) Clock 2 otherwise.
* }}} * }}}

View file

@ -248,9 +248,11 @@ private[akka] class ClusterRouteeProvider(
*/ */
private[akka] class ClusterRouterActor extends Router { private[akka] class ClusterRouterActor extends Router {
// subscribe to cluster changes, MemberEvent
// re-subscribe when restart // re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
}
override def postStop(): Unit = cluster.unsubscribe(self) override def postStop(): Unit = cluster.unsubscribe(self)
// lazy to not interfere with RoutedActorCell initialization // lazy to not interfere with RoutedActorCell initialization
@ -264,6 +266,19 @@ private[akka] class ClusterRouterActor extends Router {
def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef) def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef)
def unregisterRoutees(member: Member) = {
val address = member.address
routeeProvider.nodes -= address
// unregister routees that live on that node
val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address)
routeeProvider.unregisterRoutees(affectedRoutes)
// createRoutees will not create more than createRoutees and maxInstancesPerNode
// this is useful when totalInstances < upNodes.size
routeeProvider.createRoutees()
}
override def routerReceive: Receive = { override def routerReceive: Receive = {
case s: CurrentClusterState case s: CurrentClusterState
import Member.addressOrdering import Member.addressOrdering
@ -278,17 +293,10 @@ private[akka] class ClusterRouterActor extends Router {
case other: MemberEvent case other: MemberEvent
// other events means that it is no longer interesting, such as // other events means that it is no longer interesting, such as
// MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved // MemberJoined, MemberLeft, MemberExited, MemberRemoved
val address = other.member.address unregisterRoutees(other.member)
routeeProvider.nodes -= address
// unregister routees that live on that node
val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address)
routeeProvider.unregisterRoutees(affectedRoutes)
// createRoutees will not create more than createRoutees and maxInstancesPerNode
// this is useful when totalInstances < upNodes.size
routeeProvider.createRoutees()
case UnreachableMember(m)
unregisterRoutees(m)
} }
} }

View file

@ -76,8 +76,6 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
// still one unreachable // still one unreachable
clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(thirdAddress) clusterView.unreachableMembers.head.address must be(thirdAddress)
// and therefore no convergence
clusterView.convergence must be(false)
} }
} }
@ -94,23 +92,33 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
def memberStatus(address: Address): Option[MemberStatus] = def memberStatus(address: Address): Option[MemberStatus] =
clusterView.members.collectFirst { case m if m.address == address m.status } clusterView.members.collectFirst { case m if m.address == address m.status }
def assertNotMovedUp: Unit = { def assertNotMovedUp(joining: Boolean): Unit = {
within(20 seconds) { within(20 seconds) {
awaitCond(clusterView.members.size == 3) if (joining) awaitCond(clusterView.members.size == 0)
else awaitCond(clusterView.members.size == 2)
awaitSeenSameState(first, second, fourth) awaitSeenSameState(first, second, fourth)
memberStatus(first) must be(Some(MemberStatus.Up)) if (joining) memberStatus(first) must be(None)
memberStatus(second) must be(Some(MemberStatus.Up)) else memberStatus(first) must be(Some(MemberStatus.Up))
if (joining) memberStatus(second) must be(None)
else memberStatus(second) must be(Some(MemberStatus.Up))
// leader is not allowed to move the new node to Up // leader is not allowed to move the new node to Up
memberStatus(fourth) must be(Some(MemberStatus.Joining)) memberStatus(fourth) must be(None)
// still no convergence
clusterView.convergence must be(false)
} }
} }
runOn(first, second, fourth) { enterBarrier("after-join")
runOn(first, second) {
for (n 1 to 5) { for (n 1 to 5) {
log.debug("assertNotMovedUp#" + n) assertNotMovedUp(joining = false)
assertNotMovedUp // wait and then check again
Thread.sleep(1.second.dilated.toMillis)
}
}
runOn(fourth) {
for (n 1 to 5) {
assertNotMovedUp(joining = true)
// wait and then check again // wait and then check again
Thread.sleep(1.second.dilated.toMillis) Thread.sleep(1.second.dilated.toMillis)
} }

View file

@ -181,7 +181,6 @@ abstract class LargeClusterSpec
Await.ready(latch, remaining) Await.ready(latch, remaining)
awaitCond(clusterNodes.forall(_.readView.convergence))
val counts = clusterNodes.map(gossipCount(_)) val counts = clusterNodes.map(gossipCount(_))
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max)
log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node",
@ -278,7 +277,7 @@ abstract class LargeClusterSpec
} }
"detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest in { "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest in {
val unreachableNodes = nodesPerDatacenter val downedNodes = nodesPerDatacenter
val liveNodes = nodesPerDatacenter * 4 val liveNodes = nodesPerDatacenter * 4
within(30.seconds + 3.seconds * liveNodes) { within(30.seconds + 3.seconds * liveNodes) {
@ -293,22 +292,19 @@ abstract class LargeClusterSpec
val latch = TestLatch(nodesPerDatacenter) val latch = TestLatch(nodesPerDatacenter)
systems foreach { sys systems foreach { sys
Cluster(sys).subscribe(sys.actorOf(Props(new Actor { Cluster(sys).subscribe(sys.actorOf(Props(new Actor {
var gotUnreachable = Set.empty[Member] var gotDowned = Set.empty[Member]
def receive = { def receive = {
case state: CurrentClusterState case state: CurrentClusterState
gotUnreachable = state.unreachable gotDowned = gotDowned ++ state.unreachable.filter(_.status == Down)
checkDone()
case MemberUnreachable(m) if !latch.isOpen
gotUnreachable = gotUnreachable + m
checkDone() checkDone()
case MemberDowned(m) if !latch.isOpen case MemberDowned(m) if !latch.isOpen
gotUnreachable = gotUnreachable + m gotDowned = gotDowned + m
checkDone() checkDone()
case _ // not interesting case _ // not interesting
} }
def checkDone(): Unit = if (gotUnreachable.size == unreachableNodes) { def checkDone(): Unit = if (gotDowned.size == downedNodes) {
log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", log.info("Detected [{}] downed nodes in [{}], it took [{}], received [{}] gossip messages",
unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) downedNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys)))
latch.countDown() latch.countDown()
} }
})), classOf[ClusterDomainEvent]) })), classOf[ClusterDomainEvent])
@ -322,7 +318,6 @@ abstract class LargeClusterSpec
runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) {
Await.ready(latch, remaining) Await.ready(latch, remaining)
awaitCond(systems.forall(Cluster(_).readView.convergence))
val mergeCount = systems.map(sys Cluster(sys).readView.latestStats.mergeCount).sum val mergeCount = systems.map(sys Cluster(sys).readView.latestStats.mergeCount).sum
val counts = systems.map(sys gossipCount(Cluster(sys))) val counts = systems.map(sys gossipCount(Cluster(sys)))
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max)

View file

@ -223,7 +223,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
within(timeout) { within(timeout) {
awaitCond(clusterView.members.size == numberOfMembers) awaitCond(clusterView.members.size == numberOfMembers)
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))
awaitCond(clusterView.convergence)
// clusterView.leader is updated by LeaderChanged, await that to be updated also // clusterView.leader is updated by LeaderChanged, await that to be updated also
val expectedLeader = clusterView.members.headOption.map(_.address) val expectedLeader = clusterView.members.headOption.map(_.address)
awaitCond(clusterView.leader == expectedLeader) awaitCond(clusterView.leader == expectedLeader)

View file

@ -43,7 +43,6 @@ abstract class NodeMembershipSpec
awaitCond { awaitCond {
clusterView.members.forall(_.status == MemberStatus.Up) clusterView.members.forall(_.status == MemberStatus.Up)
} }
awaitCond(clusterView.convergence)
} }
enterBarrier("after-1") enterBarrier("after-1")
@ -60,7 +59,6 @@ abstract class NodeMembershipSpec
awaitCond { awaitCond {
clusterView.members.forall(_.status == MemberStatus.Up) clusterView.members.forall(_.status == MemberStatus.Up)
} }
awaitCond(clusterView.convergence)
enterBarrier("after-2") enterBarrier("after-2")
} }

View file

@ -114,7 +114,6 @@ abstract class TransitionSpec
startClusterNode() startClusterNode()
awaitCond(clusterView.isSingletonCluster) awaitCond(clusterView.isSingletonCluster)
awaitMemberStatus(myself, Joining) awaitMemberStatus(myself, Joining)
awaitCond(clusterView.convergence)
leaderActions() leaderActions()
awaitMemberStatus(myself, Up) awaitMemberStatus(myself, Up)
} }
@ -133,14 +132,13 @@ abstract class TransitionSpec
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
awaitMemberStatus(second, Joining) awaitMemberStatus(second, Joining)
awaitCond(seenLatestGossip == Set(first, second)) awaitCond(seenLatestGossip == Set(first, second))
clusterView.convergence must be(true)
} }
enterBarrier("convergence-joining-2") enterBarrier("convergence-joining-2")
runOn(leader(first, second)) { runOn(leader(first, second)) {
leaderActions() leaderActions()
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
awaitMemberStatus(second, Up) awaitMemberStatus(second, Joining)
} }
enterBarrier("leader-actions-2") enterBarrier("leader-actions-2")
@ -150,7 +148,6 @@ abstract class TransitionSpec
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitCond(seenLatestGossip == Set(first, second)) awaitCond(seenLatestGossip == Set(first, second))
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
clusterView.convergence must be(true)
} }
enterBarrier("after-2") enterBarrier("after-2")
@ -163,10 +160,7 @@ abstract class TransitionSpec
} }
runOn(second, third) { runOn(second, third) {
// gossip chat from the join will synchronize the views // gossip chat from the join will synchronize the views
awaitMembers(first, second, third)
awaitMemberStatus(third, Joining)
awaitCond(seenLatestGossip == Set(second, third)) awaitCond(seenLatestGossip == Set(second, third))
clusterView.convergence must be(false)
} }
enterBarrier("third-joined-second") enterBarrier("third-joined-second")
@ -177,7 +171,6 @@ abstract class TransitionSpec
awaitMemberStatus(third, Joining) awaitMemberStatus(third, Joining)
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitCond(seenLatestGossip == Set(first, second, third))
clusterView.convergence must be(true)
} }
first gossipTo third first gossipTo third
@ -187,7 +180,6 @@ abstract class TransitionSpec
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitMemberStatus(third, Joining) awaitMemberStatus(third, Joining)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitCond(seenLatestGossip == Set(first, second, third))
clusterView.convergence must be(true)
} }
enterBarrier("convergence-joining-3") enterBarrier("convergence-joining-3")
@ -196,16 +188,15 @@ abstract class TransitionSpec
leaderActions() leaderActions()
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitMemberStatus(third, Up) awaitMemberStatus(third, Joining)
} }
enterBarrier("leader-actions-3") enterBarrier("leader-actions-3")
// leader gossipTo first non-leader // leader gossipTo first non-leader
leader(first, second, third) gossipTo nonLeader(first, second, third).head leader(first, second, third) gossipTo nonLeader(first, second, third).head
runOn(nonLeader(first, second, third).head) { runOn(nonLeader(first, second, third).head) {
awaitMemberStatus(third, Up) awaitMemberStatus(third, Joining)
awaitCond(seenLatestGossip == Set(leader(first, second, third), myself)) awaitCond(seenLatestGossip == Set(leader(first, second, third), myself))
clusterView.convergence must be(false)
} }
// first non-leader gossipTo the other non-leader // first non-leader gossipTo the other non-leader
@ -217,7 +208,6 @@ abstract class TransitionSpec
runOn(nonLeader(first, second, third).tail.head) { runOn(nonLeader(first, second, third).tail.head) {
awaitMemberStatus(third, Up) awaitMemberStatus(third, Up)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitCond(seenLatestGossip == Set(first, second, third))
clusterView.convergence must be(true)
} }
// first non-leader gossipTo the leader // first non-leader gossipTo the leader
@ -227,7 +217,6 @@ abstract class TransitionSpec
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitMemberStatus(third, Up) awaitMemberStatus(third, Up)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitCond(seenLatestGossip == Set(first, second, third))
clusterView.convergence must be(true)
} }
enterBarrier("after-3") enterBarrier("after-3")
@ -247,12 +236,10 @@ abstract class TransitionSpec
runOn(first, third) { runOn(first, third) {
awaitCond(clusterView.unreachableMembers.contains(Member(second, Up))) awaitCond(clusterView.unreachableMembers.contains(Member(second, Up)))
awaitCond(!clusterView.convergence)
} }
runOn(first) { runOn(first) {
cluster.down(second) cluster.down(second)
awaitMemberStatus(second, Down)
} }
enterBarrier("after-second-down") enterBarrier("after-second-down")
@ -263,7 +250,6 @@ abstract class TransitionSpec
awaitCond(clusterView.unreachableMembers.contains(Member(second, Down))) awaitCond(clusterView.unreachableMembers.contains(Member(second, Down)))
awaitMemberStatus(second, Down) awaitMemberStatus(second, Down)
awaitCond(seenLatestGossip == Set(first, third)) awaitCond(seenLatestGossip == Set(first, third))
clusterView.convergence must be(true)
} }
enterBarrier("after-6") enterBarrier("after-6")

View file

@ -94,7 +94,6 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
members.forall(_.status == MemberStatus.Up) members.forall(_.status == MemberStatus.Up)
}) })
clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
clusterView.convergence must be(false)
} }
} }
@ -112,8 +111,6 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
// still one unreachable // still one unreachable
clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(node(victim).address) clusterView.unreachableMembers.head.address must be(node(victim).address)
// and therefore no convergence
clusterView.convergence must be(false)
} }
} }

View file

@ -42,6 +42,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
override def beforeEach(): Unit = { override def beforeEach(): Unit = {
publisher = system.actorOf(Props[ClusterDomainEventPublisher]) publisher = system.actorOf(Props[ClusterDomainEventPublisher])
publisher ! PublishChanges(g0)
expectMsg(MemberUp(a1))
expectMsg(LeaderChanged(Some(a1.address)))
expectMsgType[SeenChanged]
} }
override def afterEach(): Unit = { override def afterEach(): Unit = {
@ -50,59 +54,63 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"ClusterDomainEventPublisher" must { "ClusterDomainEventPublisher" must {
"publish MemberUp when member status changed to Up" in { "not publish MemberUp when there is no convergence" in {
publisher ! PublishChanges(g1, g2) publisher ! PublishChanges(g2)
expectMsg(MemberUp(c2))
expectMsg(ConvergenceChanged(false))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
} }
"publish convergence true when all seen it" in { "publish MemberEvents when there is convergence" in {
publisher ! PublishChanges(g2, g3) publisher ! PublishChanges(g2)
expectMsg(ConvergenceChanged(true)) expectMsgType[SeenChanged]
publisher ! PublishChanges(g3)
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
} }
"publish leader changed when new leader after convergence" in { "publish leader changed when new leader after convergence" in {
publisher ! PublishChanges(g3, g4) publisher ! PublishChanges(g4)
expectMsg(MemberUp(d1))
expectMsg(ConvergenceChanged(false))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
expectNoMsg(1 second) expectNoMsg(1 second)
publisher ! PublishChanges(g4, g5) publisher ! PublishChanges(g5)
expectMsg(MemberUp(d1))
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsg(LeaderChanged(Some(d1.address))) expectMsg(LeaderChanged(Some(d1.address)))
expectMsg(ConvergenceChanged(true))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
} }
"publish leader changed when new leader and convergence both before and after" in { "publish leader changed when new leader and convergence both before and after" in {
// convergence both before and after // convergence both before and after
publisher ! PublishChanges(g3, g5) publisher ! PublishChanges(g3)
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsgType[SeenChanged]
publisher ! PublishChanges(g5)
expectMsg(MemberUp(d1)) expectMsg(MemberUp(d1))
expectMsg(LeaderChanged(Some(d1.address))) expectMsg(LeaderChanged(Some(d1.address)))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
} }
"not publish leader changed when not convergence" in { "not publish leader changed when not convergence" in {
publisher ! PublishChanges(g2, g4) publisher ! PublishChanges(g4)
expectMsg(MemberUp(d1)) expectMsgType[SeenChanged]
expectNoMsg(1 second) expectNoMsg(1 second)
} }
"not publish leader changed when changed convergence but still same leader" in { "not publish leader changed when changed convergence but still same leader" in {
publisher ! PublishChanges(g2, g5) publisher ! PublishChanges(g5)
expectMsg(MemberUp(d1)) expectMsg(MemberUp(d1))
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsg(LeaderChanged(Some(d1.address))) expectMsg(LeaderChanged(Some(d1.address)))
expectMsg(ConvergenceChanged(true))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
publisher ! PublishChanges(g5, g4) publisher ! PublishChanges(g4)
expectMsg(ConvergenceChanged(false))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
publisher ! PublishChanges(g4, g5) publisher ! PublishChanges(g5)
expectMsg(ConvergenceChanged(true))
expectMsgType[SeenChanged] expectMsgType[SeenChanged]
} }
@ -119,12 +127,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
subscriber.expectMsgType[CurrentClusterState] subscriber.expectMsgType[CurrentClusterState]
publisher ! Unsubscribe(subscriber.ref, Some(classOf[ClusterDomainEvent])) publisher ! Unsubscribe(subscriber.ref, Some(classOf[ClusterDomainEvent]))
publisher ! PublishChanges(Gossip(members = SortedSet(a1)), Gossip(members = SortedSet(a1, b1))) publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second) subscriber.expectNoMsg(1 second)
// but testActor is still subscriber // but testActor is still subscriber
expectMsg(MemberUp(b1)) expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsgType[SeenChanged]
} }
} }
} }

View file

@ -17,6 +17,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
val a1 = Member(Address("akka", "sys", "a", 2552), Up) val a1 = Member(Address("akka", "sys", "a", 2552), Up)
val a2 = Member(Address("akka", "sys", "a", 2552), Joining) val a2 = Member(Address("akka", "sys", "a", 2552), Joining)
val a3 = Member(Address("akka", "sys", "a", 2552), Removed)
val b1 = Member(Address("akka", "sys", "b", 2552), Up) val b1 = Member(Address("akka", "sys", "b", 2552), Up)
val b2 = Member(Address("akka", "sys", "b", 2552), Removed) val b2 = Member(Address("akka", "sys", "b", 2552), Removed)
val b3 = Member(Address("akka", "sys", "b", 2552), Down) val b3 = Member(Address("akka", "sys", "b", 2552), Down)
@ -28,61 +29,82 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
val e2 = Member(Address("akka", "sys", "e", 2552), Up) val e2 = Member(Address("akka", "sys", "e", 2552), Up)
val e3 = Member(Address("akka", "sys", "e", 2552), Down) val e3 = Member(Address("akka", "sys", "e", 2552), Down)
def converge(gossip: Gossip): (Gossip, Set[Address]) =
((gossip, Set.empty[Address]) /: gossip.members) { (gs, m) (gs._1.seen(m.address), gs._2 + m.address) }
"Domain events" must { "Domain events" must {
"be produced for new members" in { "be empty for the same gossip" in {
val g1 = Gossip(members = SortedSet(a1)) val g1 = Gossip(members = SortedSet(a1))
val g2 = Gossip(members = SortedSet(a1, b1, e1))
diff(g1, g2) must be(Seq(MemberUp(b1), MemberJoined(e1))) diffUnreachable(g1, g1) must be(Seq.empty)
}
"be produced for new members" in {
val (g1, _) = converge(Gossip(members = SortedSet(a1)))
val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, e1)))
diffMemberEvents(g1, g2) must be(Seq(MemberUp(b1), MemberJoined(e1)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
} }
"be produced for changed status of members" in { "be produced for changed status of members" in {
val g1 = Gossip(members = SortedSet(a2, b1, c2)) val (g1, _) = converge(Gossip(members = SortedSet(a2, b1, c2)))
val g2 = Gossip(members = SortedSet(a1, b1, c1, e1)) val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, c1, e1)))
diff(g1, g2) must be(Seq(MemberUp(a1), MemberLeft(c1), MemberJoined(e1))) diffMemberEvents(g1, g2) must be(Seq(MemberUp(a1), MemberLeft(c1), MemberJoined(e1)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
} }
"be produced for unreachable members" in { "be produced for members in unreachable" in {
val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2)))
val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(b1, c2)))
diff(g1, g2) must be(Seq(MemberUnreachable(b1)))
}
"be produced for downed members" in {
val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2, e2))) val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2, e2)))
val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2, b3, e3))) val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2, b3, e3)))
diff(g1, g2) must be(Seq(MemberDowned(b3), MemberDowned(e3))) diffMemberEvents(g1, g2) must be(Seq(MemberDowned(b3), MemberDowned(e3)))
diffUnreachable(g1, g2) must be(Seq(UnreachableMember(b3)))
diffSeen(g1, g2) must be(Seq.empty)
}
"be produced for downed members" in {
val (g1, _) = converge(Gossip(members = SortedSet(a1, b1)))
val (g2, _) = converge(Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(e3))))
diffMemberEvents(g1, g2) must be(Seq(MemberDowned(e3)))
diffUnreachable(g1, g2) must be(Seq(UnreachableMember(e3)))
diffSeen(g1, g2) must be(Seq.empty)
} }
"be produced for removed members" in { "be produced for removed members" in {
val g1 = Gossip(members = SortedSet(a1, d1), overview = GossipOverview(unreachable = Set(c2))) val (g1, _) = converge(Gossip(members = SortedSet(a1, d1)))
val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2))) val (g2, s2) = converge(Gossip(members = SortedSet(a1)))
diff(g1, g2) must be(Seq(MemberRemoved(d2))) diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(d2)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
} }
"be produced for convergence changes" in { "be produced for convergence changes" in {
val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address) val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address)
val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address) val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address)
diff(g1, g2) must be(Seq(ConvergenceChanged(false), diffMemberEvents(g1, g2) must be(Seq.empty)
SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) diffUnreachable(g1, g2) must be(Seq.empty)
diff(g2, g1) must be(Seq(ConvergenceChanged(true), diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address))))
SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) diffMemberEvents(g2, g1) must be(Seq.empty)
diffUnreachable(g2, g1) must be(Seq.empty)
diffSeen(g2, g1) must be(Seq(SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address))))
} }
"be produced for leader changes" in { "be produced for leader changes" in {
val g1 = Gossip(members = SortedSet(a1, b1, e1)) val (g1, _) = converge(Gossip(members = SortedSet(a1, b1, e1)))
val g2 = Gossip(members = SortedSet(b1, e1), overview = GossipOverview(unreachable = Set(a1))) val (g2, s2) = converge(Gossip(members = SortedSet(b1, e1)))
val g3 = g2.copy(overview = GossipOverview()).seen(b1.address).seen(e1.address)
diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address)))) diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(a3)))
diff(g2, g3) must be(Seq(ConvergenceChanged(true), diffUnreachable(g1, g2) must be(Seq.empty)
SeenChanged(convergence = true, seenBy = Set(b1.address, e1.address)))) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(b1.address))))
} }
} }
} }

View file

@ -67,7 +67,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
clusterView.self.address must be(selfAddress) clusterView.self.address must be(selfAddress)
clusterView.members.map(_.address) must be(Set(selfAddress)) clusterView.members.map(_.address) must be(Set(selfAddress))
clusterView.status must be(MemberStatus.Joining) clusterView.status must be(MemberStatus.Joining)
clusterView.convergence must be(true)
leaderActions() leaderActions()
awaitCond(clusterView.status == MemberStatus.Up) awaitCond(clusterView.status == MemberStatus.Up)
} }

View file

@ -28,8 +28,11 @@ class GossipSpec extends WordSpec with MustMatchers {
"A Gossip" must { "A Gossip" must {
"merge members by status priority" in { "reach convergence when it's empty" in {
Gossip().convergence must be(true)
}
"merge members by status priority" in {
val g1 = Gossip(members = SortedSet(a1, c1, e1)) val g1 = Gossip(members = SortedSet(a1, c1, e1))
val g2 = Gossip(members = SortedSet(a2, c2, e2)) val g2 = Gossip(members = SortedSet(a2, c2, e2))
@ -44,7 +47,6 @@ class GossipSpec extends WordSpec with MustMatchers {
} }
"merge unreachable by status priority" in { "merge unreachable by status priority" in {
val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a1, b1, c1, d1))) val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a1, b1, c1, d1)))
val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2)))

View file

@ -4,8 +4,8 @@ import akka.actor.UntypedActor;
import akka.cluster.ClusterEvent.ClusterDomainEvent; import akka.cluster.ClusterEvent.ClusterDomainEvent;
import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberJoined; import akka.cluster.ClusterEvent.MemberJoined;
import akka.cluster.ClusterEvent.MemberUnreachable;
import akka.cluster.ClusterEvent.MemberUp; import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging; import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
@ -26,8 +26,8 @@ public class SimpleClusterListener extends UntypedActor {
MemberUp mUp = (MemberUp) message; MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member()); log.info("Member is Up: {}", mUp.member());
} else if (message instanceof MemberUnreachable) { } else if (message instanceof UnreachableMember) {
MemberUnreachable mUnreachable = (MemberUnreachable) message; UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("Member detected as unreachable: {}", mUnreachable.member()); log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof ClusterDomainEvent) { } else if (message instanceof ClusterDomainEvent) {

View file

@ -22,7 +22,7 @@ object SimpleClusterApp {
log.info("Member joined: {}", member) log.info("Member joined: {}", member)
case MemberUp(member) case MemberUp(member)
log.info("Member is Up: {}", member) log.info("Member is Up: {}", member)
case MemberUnreachable(member) case UnreachableMember(member)
log.info("Member detected as unreachable: {}", member) log.info("Member detected as unreachable: {}", member)
case _: ClusterDomainEvent // ignore case _: ClusterDomainEvent // ignore

View file

@ -15,10 +15,7 @@ import akka.actor.ReceiveTimeout
import akka.actor.RelativeActorPath import akka.actor.RelativeActorPath
import akka.actor.RootActorPath import akka.actor.RootActorPath
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent._
import akka.cluster.ClusterEvent.LeaderChanged
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.routing.FromConfig import akka.routing.FromConfig
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
@ -219,7 +216,10 @@ class StatsSampleClient(servicePath: String) extends Actor {
var nodes = Set.empty[Address] var nodes = Set.empty[Address]
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
}
override def postStop(): Unit = { override def postStop(): Unit = {
cluster.unsubscribe(self) cluster.unsubscribe(self)
tickTask.cancel() tickTask.cancel()
@ -239,6 +239,7 @@ class StatsSampleClient(servicePath: String) extends Actor {
nodes = state.members.collect { case m if m.status == MemberStatus.Up m.address } nodes = state.members.collect { case m if m.status == MemberStatus.Up m.address }
case MemberUp(m) nodes += m.address case MemberUp(m) nodes += m.address
case other: MemberEvent nodes -= other.member.address case other: MemberEvent nodes -= other.member.address
case UnreachableMember(m) nodes -= m.address
} }
} }