Handle InitJoin messages correctly, #27169
* IntiJoin message changed from companion message to case class with config parameter when the join compat check was added. Some `case InitJoin =>` were not changed. * Resulting in unhandled InitJoin and no InitJoinNack replies.
This commit is contained in:
parent
04e83ffc7c
commit
74238f1cf2
2 changed files with 12 additions and 2 deletions
|
|
@ -444,7 +444,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
||||||
|
|
||||||
def uninitialized: Actor.Receive =
|
def uninitialized: Actor.Receive =
|
||||||
({
|
({
|
||||||
case InitJoin =>
|
case InitJoin(_) =>
|
||||||
logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender())
|
logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender())
|
||||||
sender() ! InitJoinNack(selfAddress)
|
sender() ! InitJoinNack(selfAddress)
|
||||||
case ClusterUserAction.JoinTo(address) =>
|
case ClusterUserAction.JoinTo(address) =>
|
||||||
|
|
@ -465,7 +465,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
||||||
({
|
({
|
||||||
case Welcome(from, gossip) =>
|
case Welcome(from, gossip) =>
|
||||||
welcome(joinWith, from, gossip)
|
welcome(joinWith, from, gossip)
|
||||||
case InitJoin =>
|
case InitJoin(_) =>
|
||||||
logInfo("Received InitJoin message from [{}], but this node is not a member yet", sender())
|
logInfo("Received InitJoin message from [{}], but this node is not a member yet", sender())
|
||||||
sender() ! InitJoinNack(selfAddress)
|
sender() ! InitJoinNack(selfAddress)
|
||||||
case ClusterUserAction.JoinTo(address) =>
|
case ClusterUserAction.JoinTo(address) =>
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
||||||
info.getOperations.length should be > (0)
|
info.getOperations.length should be > (0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"reply with InitJoinNack for InitJoin before joining" in {
|
||||||
|
system.actorSelection("/system/cluster/core/daemon") ! InitJoin(system.settings.config)
|
||||||
|
expectMsgType[InitJoinNack]
|
||||||
|
}
|
||||||
|
|
||||||
"initially become singleton cluster when joining itself and reach convergence" in {
|
"initially become singleton cluster when joining itself and reach convergence" in {
|
||||||
clusterView.members.size should ===(0)
|
clusterView.members.size should ===(0)
|
||||||
cluster.join(selfAddress)
|
cluster.join(selfAddress)
|
||||||
|
|
@ -75,6 +80,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
||||||
awaitAssert(clusterView.status should ===(MemberStatus.Up))
|
awaitAssert(clusterView.status should ===(MemberStatus.Up))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"reply with InitJoinAck for InitJoin after joining" in {
|
||||||
|
system.actorSelection("/system/cluster/core/daemon") ! InitJoin(system.settings.config)
|
||||||
|
expectMsgType[InitJoinAck]
|
||||||
|
}
|
||||||
|
|
||||||
"publish initial state as snapshot to subscribers" in {
|
"publish initial state as snapshot to subscribers" in {
|
||||||
try {
|
try {
|
||||||
cluster.subscribe(testActor, ClusterEvent.InitialStateAsSnapshot, classOf[ClusterEvent.MemberEvent])
|
cluster.subscribe(testActor, ClusterEvent.InitialStateAsSnapshot, classOf[ClusterEvent.MemberEvent])
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue