diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index 16db6c2c8d..d94c62beb7 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -40,6 +40,9 @@ final case class Address private (protocol: String, system: String, host: Option */ def hasGlobalScope: Boolean = host.isDefined + // store hashCode + @transient override lazy val hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this) + /** * Returns the canonical String representation of this Address formatted as: * diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 8165db2720..d4cc129b29 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -118,8 +118,10 @@ object ConsistentHash { } private def nodeHashFor(node: Any, vnode: Int): Int = { - val baseStr = node.toString + ":" - hashFor(baseStr + vnode) + import MurmurHash._ + var h = startHash(node.##) + h = extendHash(h, vnode, startMagicA, startMagicB) + finalizeHash(h) } private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 240447095f..fb686f26cd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -329,20 +329,14 @@ private[cluster] case class HeartbeatNodeRing(selfAddress: Address, nodes: Set[A private val nodeRing: immutable.SortedSet[Address] = { implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ - val ha = hashFor(a) - val hb = hashFor(b) + val ha = a.## + val hb = b.## ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0) } immutable.SortedSet() ++ nodes } - private def hashFor(node: Address): Int = node match { - // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same - case Address(_, _, Some(host), Some(port)) ⇒ MurmurHash.stringHash(s"${host}:${port}") - case _ ⇒ 0 - } - /** * Receivers for `selfAddress`. Cached for subsequent access. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 0facef2125..8055ba0157 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -90,7 +90,8 @@ object Member { */ implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same - if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 + if (a eq b) false + else if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0) else false } diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala new file mode 100644 index 0000000000..4119a2d7f8 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.HashSet +import scala.concurrent.duration.Deadline +import org.scalatest.WordSpec +import org.scalatest.matchers.ShouldMatchers +import akka.actor.Address + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class HeartbeatNodeRingPerfSpec extends WordSpec with ShouldMatchers { + + val nodesSize = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.nodesSize").getOrElse("250").toInt + val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("10000").toInt + + def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = { + val nodes = (1 to size).map(n ⇒ Address("akka.tcp", "sys", "node-" + n, 2552)).toSet + val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552) + HeartbeatNodeRing(selfAddress, nodes, 5) + } + + def createClusterHeartbeatSenderStateOfSize(size: Int): ClusterHeartbeatSenderState = { + val nodes = (1 to size).map(n ⇒ Address("akka.tcp", "sys", "node-" + n, 2552)).to[HashSet] + val selfAddress = Address("akka.tcp", "sys", "node-" + (size / 2), 2552) + ClusterHeartbeatSenderState.empty(selfAddress, 5).reset(nodes) + } + + val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize) + val heartbeatSenderState = createClusterHeartbeatSenderStateOfSize(nodesSize) + + def checkThunkForRing(ring: HeartbeatNodeRing, thunk: HeartbeatNodeRing ⇒ Unit, times: Int): Unit = + for (i ← 1 to times) thunk(ring) + + def checkThunkForState(state: ClusterHeartbeatSenderState, thunk: ClusterHeartbeatSenderState ⇒ Unit, times: Int): Unit = + for (i ← 1 to times) thunk(state) + + def myReceivers(ring: HeartbeatNodeRing): Unit = { + val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers) + r.myReceivers.isEmpty should be(false) + } + + def mySenders(ring: HeartbeatNodeRing): Unit = { + val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers) + r.mySenders.isEmpty should be(false) + } + + def reset(state: ClusterHeartbeatSenderState): Unit = { + val s = ClusterHeartbeatSenderState.empty(state.ring.selfAddress, state.ring.monitoredByNrOfMembers).reset( + state.ring.nodes.asInstanceOf[HashSet[Address]]) + s.active.isEmpty should be(false) + } + + def addMember(state: ClusterHeartbeatSenderState): Unit = { + val s = state.addMember(Address("akka.tcp", "sys", "new-node", 2552)) + s.active.isEmpty should be(false) + } + + def removeMember(state: ClusterHeartbeatSenderState): Unit = { + val s = state.removeMember(Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552)) + s.active.isEmpty should be(false) + } + + def addHeartbeatRequest(state: ClusterHeartbeatSenderState): Unit = { + val a = Address("akka.tcp", "sys", "node-" + (nodesSize / 3), 2552) + val s = state.addHeartbeatRequest(a, Deadline.now) + s.active should contain(a) + } + + s"HeartbeatNodeRing of size $nodesSize" must { + + s"do a warm up run, $iterations times" in { + checkThunkForRing(heartbeatNodeRing, myReceivers, iterations) + } + + s"produce myReceivers, $iterations times" in { + checkThunkForRing(heartbeatNodeRing, myReceivers, iterations) + } + + s"produce mySenders, $iterations times" in { + checkThunkForRing(heartbeatNodeRing, mySenders, iterations) + } + } + + s"ClusterHeartbeatSenderState of size $nodesSize" must { + + s"do a warm up run, $iterations times" in { + checkThunkForState(heartbeatSenderState, reset, iterations) + } + + s"reset, $iterations times" in { + checkThunkForState(heartbeatSenderState, reset, iterations) + } + + s"addMember node, $iterations times" in { + checkThunkForState(heartbeatSenderState, addMember, iterations) + } + + s"removeMember node, $iterations times" in { + checkThunkForState(heartbeatSenderState, removeMember, iterations) + } + + s"addHeartbeatRequest node, $iterations times" in { + checkThunkForState(heartbeatSenderState, addHeartbeatRequest, iterations) + } + + } +}