Publish clean state when joining (PublishStart), see #2871
* The failure in JoinTwoClustersSpec was due to missing publishing of cluster events when clearing current state when joining * This fix is in the right direction, but joining clusters like this will need some design thought, creating ticket 2873 for that
This commit is contained in:
parent
5d53ec0c52
commit
943c438d5e
5 changed files with 31 additions and 11 deletions
|
|
@ -191,7 +191,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
|
|
||||||
// note that self is not initially member,
|
// note that self is not initially member,
|
||||||
// and the Gossip is not versioned for this 'Node' yet
|
// and the Gossip is not versioned for this 'Node' yet
|
||||||
var latestGossip: Gossip = Gossip()
|
var latestGossip: Gossip = Gossip.empty
|
||||||
|
|
||||||
var stats = ClusterStats()
|
var stats = ClusterStats()
|
||||||
|
|
||||||
|
|
@ -283,7 +283,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
def join(address: Address): Unit = {
|
def join(address: Address): Unit = {
|
||||||
if (!latestGossip.members.exists(_.address == address)) {
|
if (!latestGossip.members.exists(_.address == address)) {
|
||||||
// wipe our state since a node that joins a cluster must be empty
|
// wipe our state since a node that joins a cluster must be empty
|
||||||
latestGossip = Gossip()
|
latestGossip = Gossip.empty
|
||||||
// wipe the failure detector since we are starting fresh and shouldn't care about the past
|
// wipe the failure detector since we are starting fresh and shouldn't care about the past
|
||||||
failureDetector.reset()
|
failureDetector.reset()
|
||||||
// wipe the publisher since we are starting fresh
|
// wipe the publisher since we are starting fresh
|
||||||
|
|
@ -377,7 +377,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
||||||
def removing(address: Address): Unit = {
|
def removing(address: Address): Unit = {
|
||||||
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
|
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
|
||||||
// just cleaning up the gossip state
|
// just cleaning up the gossip state
|
||||||
latestGossip = Gossip()
|
latestGossip = Gossip.empty
|
||||||
publish(latestGossip)
|
publish(latestGossip)
|
||||||
context.become(removed)
|
context.become(removed)
|
||||||
// make sure the final (removed) state is published
|
// make sure the final (removed) state is published
|
||||||
|
|
|
||||||
|
|
@ -238,8 +238,8 @@ object ClusterEvent {
|
||||||
private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging {
|
private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging {
|
||||||
import InternalClusterAction._
|
import InternalClusterAction._
|
||||||
|
|
||||||
var latestGossip: Gossip = Gossip()
|
var latestGossip: Gossip = Gossip.empty
|
||||||
var latestConvergedGossip: Gossip = Gossip()
|
var latestConvergedGossip: Gossip = Gossip.empty
|
||||||
var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty
|
var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
@ -313,7 +313,11 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
||||||
|
|
||||||
def publish(event: AnyRef): Unit = eventStream publish event
|
def publish(event: AnyRef): Unit = eventStream publish event
|
||||||
|
|
||||||
def publishStart(): Unit = clearState()
|
def publishStart(): Unit =
|
||||||
|
if ((latestGossip ne Gossip.empty) || (latestConvergedGossip ne Gossip.empty)) {
|
||||||
|
clearState()
|
||||||
|
publishCurrentClusterState(None)
|
||||||
|
}
|
||||||
|
|
||||||
def publishDone(receiver: ActorRef): Unit = {
|
def publishDone(receiver: ActorRef): Unit = {
|
||||||
clearState()
|
clearState()
|
||||||
|
|
@ -321,7 +325,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
||||||
}
|
}
|
||||||
|
|
||||||
def clearState(): Unit = {
|
def clearState(): Unit = {
|
||||||
latestGossip = Gossip()
|
latestGossip = Gossip.empty
|
||||||
latestConvergedGossip = Gossip()
|
latestConvergedGossip = Gossip.empty
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,10 @@ import MemberStatus._
|
||||||
*/
|
*/
|
||||||
private[cluster] object Gossip {
|
private[cluster] object Gossip {
|
||||||
val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty
|
val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty
|
||||||
|
val empty: Gossip = new Gossip(Gossip.emptyMembers)
|
||||||
|
|
||||||
|
def apply(members: immutable.SortedSet[Member]) =
|
||||||
|
if (members.isEmpty) empty else empty.copy(members = members)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -49,8 +53,8 @@ private[cluster] object Gossip {
|
||||||
* removed node telling it to shut itself down.
|
* removed node telling it to shut itself down.
|
||||||
*/
|
*/
|
||||||
private[cluster] case class Gossip(
|
private[cluster] case class Gossip(
|
||||||
|
members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address
|
||||||
overview: GossipOverview = GossipOverview(),
|
overview: GossipOverview = GossipOverview(),
|
||||||
members: immutable.SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address
|
|
||||||
version: VectorClock = VectorClock()) // vector clock version
|
version: VectorClock = VectorClock()) // vector clock version
|
||||||
extends ClusterMessage // is a serializable cluster message
|
extends ClusterMessage // is a serializable cluster message
|
||||||
with Versioned[Gossip] {
|
with Versioned[Gossip] {
|
||||||
|
|
@ -128,7 +132,7 @@ private[cluster] case class Gossip(
|
||||||
// 4. fresh seen table
|
// 4. fresh seen table
|
||||||
val mergedSeen = Map.empty[Address, VectorClock]
|
val mergedSeen = Map.empty[Address, VectorClock]
|
||||||
|
|
||||||
Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedVClock)
|
Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -134,5 +134,17 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
|
||||||
expectMsg(MemberUp(c2))
|
expectMsg(MemberUp(c2))
|
||||||
expectMsgType[SeenChanged]
|
expectMsgType[SeenChanged]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"publish clean state when PublishStart" in {
|
||||||
|
publisher ! PublishChanges(g3)
|
||||||
|
expectMsg(MemberUp(b1))
|
||||||
|
expectMsg(MemberUp(c2))
|
||||||
|
expectMsgType[SeenChanged]
|
||||||
|
|
||||||
|
publisher ! PublishStart
|
||||||
|
expectMsgType[CurrentClusterState] must be(CurrentClusterState())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ class GossipSpec extends WordSpec with MustMatchers {
|
||||||
"A Gossip" must {
|
"A Gossip" must {
|
||||||
|
|
||||||
"reach convergence when it's empty" in {
|
"reach convergence when it's empty" in {
|
||||||
Gossip().convergence must be(true)
|
Gossip.empty.convergence must be(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"merge members by status priority" in {
|
"merge members by status priority" in {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue