Cluster heartbeat timings (#26757)
* Serializer for Heartbeat and HeartbeatRsp that includes sendTime and sequenceNr A future release will be required to use the serialiser once there has been a release with this PR so that old nodes can deserialise the new message. * Cross DC heartbeating sequenceNr and sendTime
This commit is contained in:
parent
f9d76aa030
commit
7c94367d13
11 changed files with 1744 additions and 100 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -1,3 +1,12 @@
|
||||||
|
# 26757 add timings to cluster heart beat messages
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterHeartbeatSender$Heartbeat$")
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterHeartbeatSender$HeartbeatRsp$")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterHeartbeatSender*")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterHeartbeatReceiver.selfHeartbeatRsp")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterHeartbeatReceiver.this")
|
||||||
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterHeartbeatSender.heartbeatRsp")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.CrossDcHeartbeatSender.selfHeartbeat")
|
||||||
|
|
||||||
# #24710 remove internal ClusterReadView.refreshCurrentState
|
# #24710 remove internal ClusterReadView.refreshCurrentState
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.refreshCurrentState")
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.refreshCurrentState")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -97,14 +97,26 @@ message CompatibleConfig {
|
||||||
****************************************/
|
****************************************/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Heartbeat
|
* Prior to version 2.5.24
|
||||||
* Sends an Address
|
* Heartbeat sends an Address
|
||||||
|
* Version 2.5.24 can deserialize this message but does not send it.
|
||||||
*/
|
*/
|
||||||
|
message Heartbeat {
|
||||||
|
required Address from = 1;
|
||||||
|
optional int64 sequenceNr = 2;
|
||||||
|
optional sint64 creationTime = 3;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HeartbeatRsp
|
* Prior to version 2.5.24
|
||||||
* Sends an UniqueAddress
|
* HeartbeatRsp Sends an UniqueAddress
|
||||||
|
* Version 2.5.24 can deserialize this message but does not send it.
|
||||||
*/
|
*/
|
||||||
|
message HeartBeatResponse {
|
||||||
|
required UniqueAddress from = 1;
|
||||||
|
optional int64 sequenceNr = 2;
|
||||||
|
optional int64 creationTime = 3;
|
||||||
|
}
|
||||||
|
|
||||||
/****************************************
|
/****************************************
|
||||||
* Cluster Gossip Messages
|
* Cluster Gossip Messages
|
||||||
|
|
|
||||||
|
|
@ -223,7 +223,7 @@ private[cluster] final class ClusterDaemon(joinConfigCompatChecker: JoinConfigCo
|
||||||
Props(classOf[ClusterCoreSupervisor], joinConfigCompatChecker).withDispatcher(context.props.dispatcher),
|
Props(classOf[ClusterCoreSupervisor], joinConfigCompatChecker).withDispatcher(context.props.dispatcher),
|
||||||
name = "core"))
|
name = "core"))
|
||||||
context.actorOf(
|
context.actorOf(
|
||||||
Props[ClusterHeartbeatReceiver].withDispatcher(context.props.dispatcher),
|
ClusterHeartbeatReceiver.props(() => Cluster(context.system)).withDispatcher(context.props.dispatcher),
|
||||||
name = "heartbeatReceiver")
|
name = "heartbeatReceiver")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,16 @@ import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
import akka.actor.{
|
||||||
import akka.actor.{ Actor, ActorLogging, ActorPath, ActorSelection, Address, DeadLetterSuppression, RootActorPath }
|
Actor,
|
||||||
|
ActorLogging,
|
||||||
|
ActorPath,
|
||||||
|
ActorSelection,
|
||||||
|
Address,
|
||||||
|
DeadLetterSuppression,
|
||||||
|
Props,
|
||||||
|
RootActorPath
|
||||||
|
}
|
||||||
import akka.cluster.ClusterEvent._
|
import akka.cluster.ClusterEvent._
|
||||||
import akka.remote.FailureDetectorRegistry
|
import akka.remote.FailureDetectorRegistry
|
||||||
import akka.remote.HeartbeatMessage
|
import akka.remote.HeartbeatMessage
|
||||||
|
|
@ -23,19 +31,20 @@ import akka.util.ccompat._
|
||||||
*/
|
*/
|
||||||
@InternalApi
|
@InternalApi
|
||||||
@ccompatUsedUntil213
|
@ccompatUsedUntil213
|
||||||
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
|
private[cluster] final class ClusterHeartbeatReceiver(getCluster: () => Cluster) extends Actor with ActorLogging {
|
||||||
import ClusterHeartbeatSender._
|
import ClusterHeartbeatSender._
|
||||||
|
|
||||||
// Important - don't use Cluster(context.system) in constructor because that would
|
// Important - don't use Cluster(context.system) in constructor because that would
|
||||||
// cause deadlock. See startup sequence in ClusterDaemon.
|
// cause deadlock. See startup sequence in ClusterDaemon.
|
||||||
lazy val cluster = Cluster(context.system)
|
lazy val cluster = getCluster()
|
||||||
lazy val selfHeartbeatRsp = HeartbeatRsp(cluster.selfUniqueAddress)
|
|
||||||
lazy val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
|
lazy val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
|
||||||
|
|
||||||
def receive = {
|
def receive: Receive = {
|
||||||
case Heartbeat(from) =>
|
case hb: Heartbeat =>
|
||||||
if (verboseHeartbeat) cluster.ClusterLogger.logDebug("Heartbeat from [{}]", from)
|
// TODO log the sequence nr once serializer is enabled
|
||||||
sender() ! selfHeartbeatRsp
|
if (verboseHeartbeat) cluster.ClusterLogger.logDebug("Heartbeat from [{}]", hb.from)
|
||||||
|
sender() ! HeartbeatRsp(cluster.selfUniqueAddress, hb.sequenceNr, hb.creationTimeNanos)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -44,6 +53,7 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
|
||||||
@InternalApi
|
@InternalApi
|
||||||
private[cluster] object ClusterHeartbeatReceiver {
|
private[cluster] object ClusterHeartbeatReceiver {
|
||||||
|
|
||||||
|
def props(clusterFactory: () => Cluster): Props = Props(new ClusterHeartbeatReceiver(clusterFactory))
|
||||||
def name: String = "heartbeatReceiver"
|
def name: String = "heartbeatReceiver"
|
||||||
def path(address: Address): ActorPath =
|
def path(address: Address): ActorPath =
|
||||||
RootActorPath(address) / "system" / "cluster" / name
|
RootActorPath(address) / "system" / "cluster" / name
|
||||||
|
|
@ -57,12 +67,15 @@ private[cluster] object ClusterHeartbeatSender {
|
||||||
/**
|
/**
|
||||||
* Sent at regular intervals for failure detection.
|
* Sent at regular intervals for failure detection.
|
||||||
*/
|
*/
|
||||||
final case class Heartbeat(from: Address) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
|
final case class Heartbeat(from: Address, sequenceNr: Long, creationTimeNanos: Long)
|
||||||
|
extends ClusterMessage
|
||||||
|
with HeartbeatMessage
|
||||||
|
with DeadLetterSuppression
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sent as reply to [[Heartbeat]] messages.
|
* Sent as reply to [[Heartbeat]] messages.
|
||||||
*/
|
*/
|
||||||
final case class HeartbeatRsp(from: UniqueAddress)
|
final case class HeartbeatRsp(from: UniqueAddress, sequenceNr: Long, creationTimeNanos: Long)
|
||||||
extends ClusterMessage
|
extends ClusterMessage
|
||||||
with HeartbeatMessage
|
with HeartbeatMessage
|
||||||
with DeadLetterSuppression
|
with DeadLetterSuppression
|
||||||
|
|
@ -80,7 +93,7 @@ private[cluster] object ClusterHeartbeatSender {
|
||||||
* a few other nodes, which will reply and then this actor updates the
|
* a few other nodes, which will reply and then this actor updates the
|
||||||
* failure detector.
|
* failure detector.
|
||||||
*/
|
*/
|
||||||
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
|
private[cluster] class ClusterHeartbeatSender extends Actor with ActorLogging {
|
||||||
import ClusterHeartbeatSender._
|
import ClusterHeartbeatSender._
|
||||||
|
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
|
|
@ -93,7 +106,12 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
val filterInternalClusterMembers: Member => Boolean =
|
val filterInternalClusterMembers: Member => Boolean =
|
||||||
_.dataCenter == cluster.selfDataCenter
|
_.dataCenter == cluster.selfDataCenter
|
||||||
|
|
||||||
val selfHeartbeat = Heartbeat(selfAddress)
|
var sequenceNr = 0
|
||||||
|
|
||||||
|
def selfHeartbeat(): Heartbeat = {
|
||||||
|
sequenceNr += 1
|
||||||
|
Heartbeat(selfAddress, sequenceNr, System.nanoTime())
|
||||||
|
}
|
||||||
|
|
||||||
val failureDetector = cluster.failureDetector
|
val failureDetector = cluster.failureDetector
|
||||||
|
|
||||||
|
|
@ -141,7 +159,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
|
|
||||||
def active: Actor.Receive = {
|
def active: Actor.Receive = {
|
||||||
case HeartbeatTick => heartbeat()
|
case HeartbeatTick => heartbeat()
|
||||||
case HeartbeatRsp(from) => heartbeatRsp(from)
|
case response: HeartbeatRsp => heartbeatRsp(response)
|
||||||
case MemberRemoved(m, _) => removeMember(m)
|
case MemberRemoved(m, _) => removeMember(m)
|
||||||
case evt: MemberEvent => addMember(evt.member)
|
case evt: MemberEvent => addMember(evt.member)
|
||||||
case UnreachableMember(m) => unreachableMember(m)
|
case UnreachableMember(m) => unreachableMember(m)
|
||||||
|
|
@ -181,6 +199,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
state = state.reachableMember(m.uniqueAddress)
|
state = state.reachableMember(m.uniqueAddress)
|
||||||
|
|
||||||
def heartbeat(): Unit = {
|
def heartbeat(): Unit = {
|
||||||
|
val nextHB = selfHeartbeat()
|
||||||
state.activeReceivers.foreach { to =>
|
state.activeReceivers.foreach { to =>
|
||||||
if (failureDetector.isMonitoring(to.address)) {
|
if (failureDetector.isMonitoring(to.address)) {
|
||||||
if (verboseHeartbeat) logDebug("Heartbeat to [{}]", to.address)
|
if (verboseHeartbeat) logDebug("Heartbeat to [{}]", to.address)
|
||||||
|
|
@ -190,7 +209,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
// other side a chance to reply, and also trigger some resends if needed
|
// other side a chance to reply, and also trigger some resends if needed
|
||||||
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to))
|
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to))
|
||||||
}
|
}
|
||||||
heartbeatReceiver(to.address) ! selfHeartbeat
|
heartbeatReceiver(to.address) ! nextHB
|
||||||
}
|
}
|
||||||
|
|
||||||
checkTickInterval()
|
checkTickInterval()
|
||||||
|
|
@ -210,9 +229,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def heartbeatRsp(from: UniqueAddress): Unit = {
|
def heartbeatRsp(response: HeartbeatRsp): Unit = {
|
||||||
if (verboseHeartbeat) logDebug("Heartbeat response from [{}]", from.address)
|
// TODO: log response time and validate sequence nrs once serialisation of sendTime is released
|
||||||
state = state.heartbeatRsp(from)
|
if (verboseHeartbeat) logDebug("Heartbeat response from [{}]", response.from.address)
|
||||||
|
state = state.heartbeatRsp(response.from)
|
||||||
}
|
}
|
||||||
|
|
||||||
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
|
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ import scala.collection.immutable
|
||||||
*/
|
*/
|
||||||
@InternalApi
|
@InternalApi
|
||||||
@ccompatUsedUntil213
|
@ccompatUsedUntil213
|
||||||
private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogging {
|
private[cluster] class CrossDcHeartbeatSender extends Actor with ActorLogging {
|
||||||
import CrossDcHeartbeatSender._
|
import CrossDcHeartbeatSender._
|
||||||
|
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
|
|
@ -56,7 +56,12 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
||||||
|
|
||||||
val crossDcFailureDetector = cluster.crossDcFailureDetector
|
val crossDcFailureDetector = cluster.crossDcFailureDetector
|
||||||
|
|
||||||
val selfHeartbeat = ClusterHeartbeatSender.Heartbeat(selfAddress)
|
var sequenceNr: Long = 0
|
||||||
|
|
||||||
|
def nextHeartBeat() = {
|
||||||
|
sequenceNr += 1
|
||||||
|
ClusterHeartbeatSender.Heartbeat(selfAddress, sequenceNr, System.nanoTime())
|
||||||
|
}
|
||||||
|
|
||||||
var dataCentersState: CrossDcHeartbeatingState = CrossDcHeartbeatingState.init(
|
var dataCentersState: CrossDcHeartbeatingState = CrossDcHeartbeatingState.init(
|
||||||
selfDataCenter,
|
selfDataCenter,
|
||||||
|
|
@ -110,7 +115,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
||||||
|
|
||||||
def active: Actor.Receive = {
|
def active: Actor.Receive = {
|
||||||
case ClusterHeartbeatSender.HeartbeatTick => heartbeat()
|
case ClusterHeartbeatSender.HeartbeatTick => heartbeat()
|
||||||
case ClusterHeartbeatSender.HeartbeatRsp(from) => heartbeatRsp(from)
|
case ClusterHeartbeatSender.HeartbeatRsp(from, _, _) => heartbeatRsp(from)
|
||||||
case MemberRemoved(m, _) => removeMember(m)
|
case MemberRemoved(m, _) => removeMember(m)
|
||||||
case evt: MemberEvent => addMember(evt.member)
|
case evt: MemberEvent => addMember(evt.member)
|
||||||
case ClusterHeartbeatSender.ExpectedFirstHeartbeat(from) => triggerFirstHeartbeat(from)
|
case ClusterHeartbeatSender.ExpectedFirstHeartbeat(from) => triggerFirstHeartbeat(from)
|
||||||
|
|
@ -153,6 +158,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
||||||
}
|
}
|
||||||
|
|
||||||
def heartbeat(): Unit = {
|
def heartbeat(): Unit = {
|
||||||
|
val nextHB = nextHeartBeat()
|
||||||
dataCentersState.activeReceivers.foreach { to =>
|
dataCentersState.activeReceivers.foreach { to =>
|
||||||
if (crossDcFailureDetector.isMonitoring(to.address)) {
|
if (crossDcFailureDetector.isMonitoring(to.address)) {
|
||||||
if (verboseHeartbeat) logDebug("(Cross) Heartbeat to [{}]", to.address)
|
if (verboseHeartbeat) logDebug("(Cross) Heartbeat to [{}]", to.address)
|
||||||
|
|
@ -162,7 +168,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
||||||
// other side a chance to reply, and also trigger some resends if needed
|
// other side a chance to reply, and also trigger some resends if needed
|
||||||
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ClusterHeartbeatSender.ExpectedFirstHeartbeat(to))
|
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ClusterHeartbeatSender.ExpectedFirstHeartbeat(to))
|
||||||
}
|
}
|
||||||
heartbeatReceiver(to.address) ! selfHeartbeat
|
heartbeatReceiver(to.address) ! nextHB
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -42,8 +42,12 @@ private[akka] object ClusterMessageSerializer {
|
||||||
val InitJoinManifest = s"akka.cluster.InternalClusterAction$$InitJoin$$"
|
val InitJoinManifest = s"akka.cluster.InternalClusterAction$$InitJoin$$"
|
||||||
val InitJoinAckManifest = s"akka.cluster.InternalClusterAction$$InitJoinAck"
|
val InitJoinAckManifest = s"akka.cluster.InternalClusterAction$$InitJoinAck"
|
||||||
val InitJoinNackManifest = s"akka.cluster.InternalClusterAction$$InitJoinNack"
|
val InitJoinNackManifest = s"akka.cluster.InternalClusterAction$$InitJoinNack"
|
||||||
val HeartBeatManifest = s"akka.cluster.ClusterHeartbeatSender$$Heartbeat"
|
// FIXME, remove in a later version (2.6?) and make 2.5.24+ a mandatory step for rolling upgrade
|
||||||
val HeartBeatRspManifest = s"akka.cluster.ClusterHeartbeatSender$$HeartbeatRsp"
|
val HeartBeatManifestPre2523 = s"akka.cluster.ClusterHeartbeatSender$$Heartbeat"
|
||||||
|
val HeartBeatRspManifest2523 = s"akka.cluster.ClusterHeartbeatSender$$HeartbeatRsp"
|
||||||
|
|
||||||
|
val HeartBeatManifest = "HB"
|
||||||
|
val HeartBeatRspManifest = "HBR"
|
||||||
val ExitingConfirmedManifest = s"akka.cluster.InternalClusterAction$$ExitingConfirmed"
|
val ExitingConfirmedManifest = s"akka.cluster.InternalClusterAction$$ExitingConfirmed"
|
||||||
val GossipStatusManifest = "akka.cluster.GossipStatus"
|
val GossipStatusManifest = "akka.cluster.GossipStatus"
|
||||||
val GossipEnvelopeManifest = "akka.cluster.GossipEnvelope"
|
val GossipEnvelopeManifest = "akka.cluster.GossipEnvelope"
|
||||||
|
|
@ -72,8 +76,8 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
|
||||||
case _: InternalClusterAction.InitJoin => InitJoinManifest
|
case _: InternalClusterAction.InitJoin => InitJoinManifest
|
||||||
case _: InternalClusterAction.InitJoinAck => InitJoinAckManifest
|
case _: InternalClusterAction.InitJoinAck => InitJoinAckManifest
|
||||||
case _: InternalClusterAction.InitJoinNack => InitJoinNackManifest
|
case _: InternalClusterAction.InitJoinNack => InitJoinNackManifest
|
||||||
case _: ClusterHeartbeatSender.Heartbeat => HeartBeatManifest
|
case _: ClusterHeartbeatSender.Heartbeat => HeartBeatManifestPre2523
|
||||||
case _: ClusterHeartbeatSender.HeartbeatRsp => HeartBeatRspManifest
|
case _: ClusterHeartbeatSender.HeartbeatRsp => HeartBeatRspManifest2523
|
||||||
case _: ExitingConfirmed => ExitingConfirmedManifest
|
case _: ExitingConfirmed => ExitingConfirmedManifest
|
||||||
case _: GossipStatus => GossipStatusManifest
|
case _: GossipStatus => GossipStatusManifest
|
||||||
case _: GossipEnvelope => GossipEnvelopeManifest
|
case _: GossipEnvelope => GossipEnvelopeManifest
|
||||||
|
|
@ -83,8 +87,8 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
|
||||||
}
|
}
|
||||||
|
|
||||||
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||||
case ClusterHeartbeatSender.Heartbeat(from) => addressToProtoByteArray(from)
|
case ClusterHeartbeatSender.Heartbeat(from, _, _) => addressToProtoByteArray(from)
|
||||||
case ClusterHeartbeatSender.HeartbeatRsp(from) => uniqueAddressToProtoByteArray(from)
|
case ClusterHeartbeatSender.HeartbeatRsp(from, _, _) => uniqueAddressToProtoByteArray(from)
|
||||||
case m: GossipEnvelope => gossipEnvelopeToProto(m).toByteArray
|
case m: GossipEnvelope => gossipEnvelopeToProto(m).toByteArray
|
||||||
case m: GossipStatus => gossipStatusToProto(m).toByteArray
|
case m: GossipStatus => gossipStatusToProto(m).toByteArray
|
||||||
case InternalClusterAction.Join(node, roles) => joinToProto(node, roles).toByteArray
|
case InternalClusterAction.Join(node, roles) => joinToProto(node, roles).toByteArray
|
||||||
|
|
@ -101,8 +105,10 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
|
||||||
}
|
}
|
||||||
|
|
||||||
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
|
||||||
|
case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes)
|
||||||
|
case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes)
|
||||||
case HeartBeatManifest => deserializeHeartBeat(bytes)
|
case HeartBeatManifest => deserializeHeartBeat(bytes)
|
||||||
case HeartBeatRspManifest => deserializeHeartBeatRsp(bytes)
|
case HeartBeatRspManifest => deserializeHeartBeatResponse(bytes)
|
||||||
case GossipStatusManifest => deserializeGossipStatus(bytes)
|
case GossipStatusManifest => deserializeGossipStatus(bytes)
|
||||||
case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
|
case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
|
||||||
case InitJoinManifest => deserializeInitJoin(bytes)
|
case InitJoinManifest => deserializeInitJoin(bytes)
|
||||||
|
|
@ -154,9 +160,9 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
|
||||||
case _ => throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.")
|
case _ => throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.")
|
||||||
}
|
}
|
||||||
|
|
||||||
private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray
|
private[akka] def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray
|
||||||
|
|
||||||
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder = {
|
private[akka] def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder = {
|
||||||
cm.UniqueAddress
|
cm.UniqueAddress
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.setAddress(addressToProto(uniqueAddress.address))
|
.setAddress(addressToProto(uniqueAddress.address))
|
||||||
|
|
@ -277,12 +283,22 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
|
||||||
InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes))
|
InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def deserializeHeartBeatRsp(bytes: Array[Byte]): ClusterHeartbeatSender.HeartbeatRsp = {
|
private def deserializeHeartBeatRspAsUniqueAddress(bytes: Array[Byte]): ClusterHeartbeatSender.HeartbeatRsp = {
|
||||||
ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))
|
ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes), -1, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def deserializeHeartBeat(bytes: Array[Byte]): ClusterHeartbeatSender.Heartbeat = {
|
private def deserializeHeartBeatAsAddress(bytes: Array[Byte]): ClusterHeartbeatSender.Heartbeat = {
|
||||||
ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))
|
ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes), -1, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
def deserializeHeartBeat(bytes: Array[Byte]): ClusterHeartbeatSender.Heartbeat = {
|
||||||
|
val hb = cm.Heartbeat.parseFrom(bytes)
|
||||||
|
ClusterHeartbeatSender.Heartbeat(addressFromProto(hb.getFrom), hb.getSequenceNr, hb.getCreationTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
def deserializeHeartBeatResponse(bytes: Array[Byte]): ClusterHeartbeatSender.HeartbeatRsp = {
|
||||||
|
val hbr = cm.HeartBeatResponse.parseFrom(bytes)
|
||||||
|
ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromProto(hbr.getFrom), hbr.getSequenceNr, hbr.getCreationTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def deserializeInitJoinNack(bytes: Array[Byte]): InternalClusterAction.InitJoinNack = {
|
private def deserializeInitJoinNack(bytes: Array[Byte]): InternalClusterAction.InitJoinNack = {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import akka.cluster.ClusterHeartbeatSender.{ Heartbeat, HeartbeatRsp }
|
||||||
|
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||||
|
|
||||||
|
class ClusterHeartbeatReceiverSpec extends AkkaSpec("""
|
||||||
|
akka.actor.provider = cluster
|
||||||
|
""".stripMargin) with ImplicitSender {
|
||||||
|
"ClusterHeartbeatReceiver" should {
|
||||||
|
"respond to heartbeats with the same sequenceNr and sendTime" in {
|
||||||
|
val heartBeater = system.actorOf(ClusterHeartbeatReceiver.props(() => Cluster(system)))
|
||||||
|
heartBeater ! Heartbeat(Cluster(system).selfAddress, 1, 2)
|
||||||
|
expectMsg(HeartbeatRsp(Cluster(system).selfUniqueAddress, 1, 2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import akka.actor.{ ActorSelection, Address, Props }
|
||||||
|
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
|
||||||
|
import akka.cluster.ClusterHeartbeatSender.Heartbeat
|
||||||
|
import akka.cluster.ClusterHeartbeatSenderSpec.TestClusterHeartBeatSender
|
||||||
|
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
|
||||||
|
|
||||||
|
object ClusterHeartbeatSenderSpec {
|
||||||
|
class TestClusterHeartBeatSender(probe: TestProbe) extends ClusterHeartbeatSender {
|
||||||
|
// don't register for cluster events
|
||||||
|
override def preStart(): Unit = {}
|
||||||
|
|
||||||
|
// override where the heart beats go to
|
||||||
|
override def heartbeatReceiver(address: Address): ActorSelection = {
|
||||||
|
context.actorSelection(probe.ref.path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class ClusterHeartbeatSenderSpec extends AkkaSpec("""
|
||||||
|
akka.loglevel = DEBUG
|
||||||
|
akka.actor.provider = cluster
|
||||||
|
akka.cluster.failure-detector.heartbeat-interval = 0.2s
|
||||||
|
""".stripMargin) with ImplicitSender {
|
||||||
|
|
||||||
|
"ClusterHeartBeatSender" must {
|
||||||
|
"increment heart beat sequence nr" in {
|
||||||
|
val probe = TestProbe()
|
||||||
|
val underTest = system.actorOf(Props(new TestClusterHeartBeatSender(probe)))
|
||||||
|
underTest ! CurrentClusterState()
|
||||||
|
underTest ! MemberUp(
|
||||||
|
Member(UniqueAddress(Address("akka", system.name), 1L), Set("dc-default")).copy(status = MemberStatus.Up))
|
||||||
|
|
||||||
|
probe.expectMsgType[Heartbeat].sequenceNr shouldEqual 1
|
||||||
|
probe.expectMsgType[Heartbeat].sequenceNr shouldEqual 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,50 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import akka.actor.{ ActorSelection, Address, Props }
|
||||||
|
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||||
|
import akka.cluster.ClusterHeartbeatSender.Heartbeat
|
||||||
|
import akka.cluster.CrossDcHeartbeatSenderSpec.TestCrossDcHeartbeatSender
|
||||||
|
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
|
||||||
|
|
||||||
|
import scala.collection.immutable.SortedSet
|
||||||
|
|
||||||
|
object CrossDcHeartbeatSenderSpec {
|
||||||
|
class TestCrossDcHeartbeatSender(probe: TestProbe) extends CrossDcHeartbeatSender {
|
||||||
|
// disable register for cluster events
|
||||||
|
override def preStart(): Unit = {}
|
||||||
|
|
||||||
|
override def heartbeatReceiver(address: Address): ActorSelection = {
|
||||||
|
context.actorSelection(probe.ref.path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class CrossDcHeartbeatSenderSpec extends AkkaSpec("""
|
||||||
|
akka.loglevel = DEBUG
|
||||||
|
akka.actor.provider = cluster
|
||||||
|
akka.cluster.failure-detector.heartbeat-interval = 0.2s
|
||||||
|
akka.cluster.multi-data-center {
|
||||||
|
self-data-center = "dc1"
|
||||||
|
heartbeat-interval = 0.2s
|
||||||
|
}
|
||||||
|
""") with ImplicitSender {
|
||||||
|
"CrossDcHeartBeatSender" should {
|
||||||
|
"increment heart beat sequence nr" in {
|
||||||
|
val probe = TestProbe()
|
||||||
|
Cluster(system).join(Cluster(system).selfMember.address)
|
||||||
|
awaitAssert(Cluster(system).selfMember.status == MemberStatus.Up)
|
||||||
|
val underTest = system.actorOf(Props(new TestCrossDcHeartbeatSender(probe)))
|
||||||
|
underTest ! CurrentClusterState(
|
||||||
|
members = SortedSet(
|
||||||
|
Cluster(system).selfMember,
|
||||||
|
Member(UniqueAddress(Address("akka", system.name), 2L), Set("dc-dc2")).copy(status = MemberStatus.Up)))
|
||||||
|
|
||||||
|
probe.expectMsgType[Heartbeat].sequenceNr shouldEqual 1
|
||||||
|
probe.expectMsgType[Heartbeat].sequenceNr shouldEqual 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -9,11 +9,13 @@ import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
|
||||||
import akka.cluster.InternalClusterAction.CompatibleConfig
|
import akka.cluster.InternalClusterAction.CompatibleConfig
|
||||||
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
|
import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
|
||||||
import akka.routing.RoundRobinPool
|
import akka.routing.RoundRobinPool
|
||||||
|
import akka.cluster.protobuf.msg.{ ClusterMessages => cm }
|
||||||
|
|
||||||
import collection.immutable.SortedSet
|
import collection.immutable.SortedSet
|
||||||
import akka.testkit.{ AkkaSpec, TestKit }
|
import akka.testkit.{ AkkaSpec, TestKit }
|
||||||
import com.github.ghik.silencer.silent
|
import com.github.ghik.silencer.silent
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
@silent
|
@silent
|
||||||
class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = cluster") {
|
class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = cluster") {
|
||||||
|
|
||||||
|
|
@ -59,8 +61,8 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
|
||||||
checkSerialization(InternalClusterAction.InitJoin(ConfigFactory.empty))
|
checkSerialization(InternalClusterAction.InitJoin(ConfigFactory.empty))
|
||||||
checkSerialization(InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)))
|
checkSerialization(InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)))
|
||||||
checkSerialization(InternalClusterAction.InitJoinNack(address))
|
checkSerialization(InternalClusterAction.InitJoinNack(address))
|
||||||
checkSerialization(ClusterHeartbeatSender.Heartbeat(address))
|
checkSerialization(ClusterHeartbeatSender.Heartbeat(address, -1, -1))
|
||||||
checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress))
|
checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1))
|
||||||
checkSerialization(InternalClusterAction.ExitingConfirmed(uniqueAddress))
|
checkSerialization(InternalClusterAction.ExitingConfirmed(uniqueAddress))
|
||||||
|
|
||||||
val node1 = VectorClock.Node("node1")
|
val node1 = VectorClock.Node("node1")
|
||||||
|
|
@ -168,6 +170,47 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
|
||||||
join.roles should be(Set(ClusterSettings.DcRolePrefix + "default"))
|
join.roles should be(Set(ClusterSettings.DcRolePrefix + "default"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"Rolling upgrades for heart beat message changes in 2.5.23" must {
|
||||||
|
// FIXME, add issue for serializing this as the new message type
|
||||||
|
|
||||||
|
"serialize heart beats as Address to support versions prior or 2.5.23" in {
|
||||||
|
serializer.manifest(ClusterHeartbeatSender.Heartbeat(a1.address, -1, -1)) should ===(
|
||||||
|
ClusterMessageSerializer.HeartBeatManifestPre2523)
|
||||||
|
}
|
||||||
|
|
||||||
|
"serialize heart beat responses as UniqueAddress to support versions prior to 2.5.23" in {
|
||||||
|
serializer.manifest(ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, -1, -1)) should ===(
|
||||||
|
ClusterMessageSerializer.HeartBeatRspManifest2523)
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to deserialize HeartBeat protobuf message" in {
|
||||||
|
val hbProtobuf = cm.Heartbeat
|
||||||
|
.newBuilder()
|
||||||
|
.setFrom(serializer.addressToProto(a1.address))
|
||||||
|
.setSequenceNr(1)
|
||||||
|
.setCreationTime(2)
|
||||||
|
.build()
|
||||||
|
.toByteArray
|
||||||
|
|
||||||
|
serializer.fromBinary(hbProtobuf, ClusterMessageSerializer.HeartBeatManifest) should ===(
|
||||||
|
ClusterHeartbeatSender.Heartbeat(a1.address, 1, 2))
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to deserialize HeartBeatRsp probuf message" in {
|
||||||
|
val hbrProtobuf = cm.HeartBeatResponse
|
||||||
|
.newBuilder()
|
||||||
|
.setFrom(serializer.uniqueAddressToProto(a1.uniqueAddress))
|
||||||
|
.setSequenceNr(1)
|
||||||
|
.setCreationTime(2)
|
||||||
|
.build()
|
||||||
|
.toByteArray
|
||||||
|
|
||||||
|
serializer.fromBinary(hbrProtobuf, ClusterMessageSerializer.HeartBeatRspManifest) should ===(
|
||||||
|
ClusterHeartbeatSender.HeartbeatRsp(a1.uniqueAddress, 1, 2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"Cluster router pool" must {
|
"Cluster router pool" must {
|
||||||
"be serializable with no role" in {
|
"be serializable with no role" in {
|
||||||
checkSerialization(
|
checkSerialization(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue