Merge pull request #1726 from akka/wip-3498-optimize-heartbeat-sender-patriknw

=clu #3498 Optimize ClusterHeartbeatSender
This commit is contained in:
Patrik Nordwall 2013-09-17 01:35:53 -07:00
commit 2a7d12701b
5 changed files with 120 additions and 11 deletions

View file

@ -40,6 +40,9 @@ final case class Address private (protocol: String, system: String, host: Option
*/
def hasGlobalScope: Boolean = host.isDefined
// store hashCode
@transient override lazy val hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this)
/**
* Returns the canonical String representation of this Address formatted as:
*

View file

@ -118,8 +118,10 @@ object ConsistentHash {
}
private def nodeHashFor(node: Any, vnode: Int): Int = {
val baseStr = node.toString + ":"
hashFor(baseStr + vnode)
import MurmurHash._
var h = startHash(node.##)
h = extendHash(h, vnode, startMagicA, startMagicB)
finalizeHash(h)
}
private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes)

View file

@ -329,20 +329,14 @@ private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[A
private val nodeRing: immutable.SortedSet[Address] = {
implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
val ha = hashFor(a)
val hb = hashFor(b)
val ha = a.##
val hb = b.##
ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0)
}
immutable.SortedSet() ++ nodes
}
private def hashFor(node: Address): Int = node match {
// cluster node identifier is the host and port of the address; protocol and system is assumed to be the same
case Address(_, _, Some(host), Some(port)) MurmurHash.stringHash(s"${host}:${port}")
case _ 0
}
/**
* Receivers for `selfAddress`. Cached for subsequent access.
*/

View file

@ -90,7 +90,8 @@ object Member {
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
// cluster node identifier is the host and port of the address; protocol and system is assumed to be the same
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
if (a eq b) false
else if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
else false
}

View file

@ -0,0 +1,109 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.HashSet
import scala.concurrent.duration.Deadline
import org.scalatest.WordSpec
import org.scalatest.matchers.ShouldMatchers
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class HeartbeatNodeRingPerfSpec extends WordSpec with ShouldMatchers {
val nodesSize = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.nodesSize").getOrElse("250").toInt
val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("10000").toInt
def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = {
val nodes = (1 to size).map(n Address("akka.tcp", "sys", "node-" + n, 2552)).toSet
val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552)
HeartbeatNodeRing(selfAddress, nodes, 5)
}
def createClusterHeartbeatSenderStateOfSize(size: Int): ClusterHeartbeatSenderState = {
val nodes = (1 to size).map(n Address("akka.tcp", "sys", "node-" + n, 2552)).to[HashSet]
val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552)
ClusterHeartbeatSenderState.empty(selfAddress, 5).reset(nodes)
}
val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize)
val heartbeatSenderState = createClusterHeartbeatSenderStateOfSize(nodesSize)
def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing Unit, times: Int): Unit =
for (i 1 to times) thunk(ring)
def checkThunkForState(state: ClusterHeartbeatSenderState, thunk: ClusterHeartbeatSenderState Unit, times: Int): Unit =
for (i 1 to times) thunk(state)
def myReceivers(ring: HeartbeatNodeRing): Unit = {
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers)
r.myReceivers.isEmpty should be(false)
}
def mySenders(ring: HeartbeatNodeRing): Unit = {
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers)
r.mySenders.isEmpty should be(false)
}
def reset(state: ClusterHeartbeatSenderState): Unit = {
val s = ClusterHeartbeatSenderState.empty(state.ring.selfAddress, state.ring.monitoredByNrOfMembers).reset(
state.ring.nodes.asInstanceOf[HashSet[Address]])
s.active.isEmpty should be(false)
}
def addMember(state: ClusterHeartbeatSenderState): Unit = {
val s = state.addMember(Address("akka.tcp", "sys", "new-node", 2552))
s.active.isEmpty should be(false)
}
def removeMember(state: ClusterHeartbeatSenderState): Unit = {
val s = state.removeMember(Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552))
s.active.isEmpty should be(false)
}
def addHeartbeatRequest(state: ClusterHeartbeatSenderState): Unit = {
val a = Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552)
val s = state.addHeartbeatRequest(a, Deadline.now)
s.active should contain(a)
}
s"HeartbeatNodeRing of size $nodesSize" must {
s"do a warm up run, $iterations times" in {
checkThunkForRing(heartbeatNodeRing, myReceivers, iterations)
}
s"produce myReceivers, $iterations times" in {
checkThunkForRing(heartbeatNodeRing, myReceivers, iterations)
}
s"produce mySenders, $iterations times" in {
checkThunkForRing(heartbeatNodeRing, mySenders, iterations)
}
}
s"ClusterHeartbeatSenderState of size $nodesSize" must {
s"do a warm up run, $iterations times" in {
checkThunkForState(heartbeatSenderState, reset, iterations)
}
s"reset, $iterations times" in {
checkThunkForState(heartbeatSenderState, reset, iterations)
}
s"addMember node, $iterations times" in {
checkThunkForState(heartbeatSenderState, addMember, iterations)
}
s"removeMember node, $iterations times" in {
checkThunkForState(heartbeatSenderState, removeMember, iterations)
}
s"addHeartbeatRequest node, $iterations times" in {
checkThunkForState(heartbeatSenderState, addHeartbeatRequest, iterations)
}
}
}